You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1081 lines
42 KiB
Python
1081 lines
42 KiB
Python
# Copyright (C) 2022 CVAT.ai Corporation
|
|
#
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
# CVAT REST API
|
|
#
|
|
# REST API for Computer Vision Annotation Tool (CVAT) # noqa: E501
|
|
#
|
|
# The version of the OpenAPI document: alpha (2.0)
|
|
# Contact: support@cvat.ai
|
|
# Generated by: https://openapi-generator.tech
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
import atexit
|
|
import importlib
|
|
import io
|
|
import json
|
|
import mimetypes
|
|
import os
|
|
import re
|
|
import typing
|
|
from multiprocessing.pool import ThreadPool
|
|
from typing import TYPE_CHECKING
|
|
from urllib.parse import quote
|
|
|
|
from urllib3 import HTTPResponse
|
|
from urllib3.fields import RequestField
|
|
|
|
from cvat_sdk import rest
|
|
from cvat_sdk.configuration import Configuration
|
|
from cvat_sdk.exceptions import ApiException, ApiTypeError, ApiValueError
|
|
from cvat_sdk.model_utils import (
|
|
ModelComposed,
|
|
ModelNormal,
|
|
ModelSimple,
|
|
check_allowed_values,
|
|
check_validations,
|
|
date,
|
|
datetime,
|
|
deserialize_file,
|
|
file_type,
|
|
model_to_dict,
|
|
none_type,
|
|
validate_and_convert_types,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
# Enable introspection. Can't work normally due to cyclic imports
|
|
from cvat_sdk.apis import *
|
|
from cvat_sdk.models import *
|
|
|
|
|
|
class ApiClient(object):
|
|
"""Generic API client for OpenAPI client library builds.
|
|
|
|
OpenAPI generic API client. This client handles the client-
|
|
server communication, and is invariant across implementations. Specifics of
|
|
the methods and models for each application are generated from the OpenAPI
|
|
templates.
|
|
|
|
NOTE: This class is auto generated by OpenAPI Generator.
|
|
Ref: https://openapi-generator.tech
|
|
Do not edit the class manually.
|
|
|
|
Class members:
|
|
|
|
auth_api: AuthApi
|
|
cloud_storages_api: CloudStoragesApi
|
|
comments_api: CommentsApi
|
|
invitations_api: InvitationsApi
|
|
issues_api: IssuesApi
|
|
jobs_api: JobsApi
|
|
lambda_api: LambdaApi
|
|
memberships_api: MembershipsApi
|
|
organizations_api: OrganizationsApi
|
|
projects_api: ProjectsApi
|
|
restrictions_api: RestrictionsApi
|
|
schema_api: SchemaApi
|
|
server_api: ServerApi
|
|
tasks_api: TasksApi
|
|
users_api: UsersApi
|
|
"""
|
|
|
|
_pool = None
|
|
|
|
def __init__(
|
|
self, configuration=None, header_name=None, header_value=None, cookie=None, pool_threads=1
|
|
):
|
|
"""
|
|
:param configuration: configuration object for this client
|
|
:param header_name: a header to pass when making calls to the API.
|
|
:param header_value: a header value to pass when making calls to
|
|
the API.
|
|
:param cookie: a cookie to include in the header when making calls
|
|
to the API
|
|
:param pool_threads: The number of threads to use for async requests
|
|
to the API. More threads means more concurrent API requests.
|
|
"""
|
|
|
|
if configuration is None:
|
|
configuration = Configuration.get_default_copy()
|
|
self.configuration = configuration
|
|
self.pool_threads = pool_threads
|
|
|
|
self.rest_client = rest.RESTClientObject(configuration)
|
|
self.default_headers = {}
|
|
if header_name is not None:
|
|
self.default_headers[header_name] = header_value
|
|
self.cookie = cookie
|
|
# Set default User-Agent.
|
|
self.user_agent = "OpenAPI-Generator/2.0-alpha/python"
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self.close()
|
|
|
|
def close(self):
|
|
if self._pool:
|
|
self._pool.close()
|
|
self._pool.join()
|
|
self._pool = None
|
|
if hasattr(atexit, "unregister"):
|
|
atexit.unregister(self.close)
|
|
|
|
@property
|
|
def pool(self):
|
|
"""Create thread pool on first request
|
|
avoids instantiating unused threadpool for blocking clients.
|
|
"""
|
|
if self._pool is None:
|
|
atexit.register(self.close)
|
|
self._pool = ThreadPool(self.pool_threads)
|
|
return self._pool
|
|
|
|
@property
|
|
def user_agent(self):
|
|
"""User agent for this API client"""
|
|
return self.default_headers["User-Agent"]
|
|
|
|
@user_agent.setter
|
|
def user_agent(self, value):
|
|
self.default_headers["User-Agent"] = value
|
|
|
|
def _serialize_post_parameter(self, obj):
|
|
if isinstance(obj, (str, int, float, none_type, bool)):
|
|
return ("", json.dumps(obj), "application/json")
|
|
elif isinstance(obj, io.IOBase):
|
|
return self._serialize_file(obj)
|
|
raise ApiValueError(
|
|
"Unable to prepare type {} for serialization".format(obj.__class__.__name__)
|
|
)
|
|
|
|
def _convert_body_to_post_params(self, body):
|
|
# body must be a flat structure, lists of primitives is possible
|
|
body = self.sanitize_for_serialization(body, read_files=False)
|
|
assert isinstance(body, dict), type(body)
|
|
|
|
post_params = []
|
|
for k, v in body.items():
|
|
if isinstance(v, (tuple, list)):
|
|
for i, entry in enumerate(v):
|
|
post_params.append((f"{k}[{i}]", self._serialize_post_parameter(entry)))
|
|
else:
|
|
post_params.append((k, self._serialize_post_parameter(v)))
|
|
return post_params
|
|
|
|
def set_default_header(self, header_name, header_value):
|
|
self.default_headers[header_name] = header_value
|
|
|
|
def __call_api(
|
|
self,
|
|
resource_path: str,
|
|
method: str,
|
|
path_params: typing.Optional[typing.Dict[str, typing.Any]] = None,
|
|
query_params: typing.Optional[typing.List[typing.Tuple[str, typing.Any]]] = None,
|
|
header_params: typing.Optional[typing.Dict[str, typing.Any]] = None,
|
|
body: typing.Optional[typing.Any] = None,
|
|
post_params: typing.Optional[typing.List[typing.Tuple[str, typing.Any]]] = None,
|
|
files: typing.Optional[typing.Dict[str, typing.List[io.IOBase]]] = None,
|
|
response_schema: typing.Optional[typing.Tuple[typing.Any]] = None,
|
|
auth_settings: typing.Optional[typing.List[str]] = None,
|
|
collection_formats: typing.Optional[typing.Dict[str, str]] = None,
|
|
*,
|
|
_parse_response: bool = True,
|
|
_request_timeout: typing.Optional[typing.Union[int, float, typing.Tuple]] = None,
|
|
_host: typing.Optional[str] = None,
|
|
_check_type: typing.Optional[bool] = None,
|
|
_check_status: bool = True,
|
|
_request_auths: typing.Optional[typing.List[typing.Dict[str, typing.Any]]] = None,
|
|
):
|
|
|
|
config = self.configuration
|
|
|
|
# header parameters
|
|
header_params = header_params or {}
|
|
header_params.update(self.default_headers)
|
|
if self.cookie:
|
|
header_params["Cookie"] = self.cookie
|
|
if header_params:
|
|
header_params = self.sanitize_for_serialization(header_params)
|
|
header_params = dict(self.parameters_to_tuples(header_params, collection_formats))
|
|
|
|
# path parameters
|
|
if path_params:
|
|
path_params = self.sanitize_for_serialization(path_params)
|
|
path_params = self.parameters_to_tuples(path_params, collection_formats)
|
|
for k, v in path_params:
|
|
# specified safe chars, encode everything
|
|
resource_path = resource_path.replace(
|
|
"{%s}" % k, quote(str(v), safe=config.safe_chars_for_path_param)
|
|
)
|
|
|
|
# query parameters
|
|
if query_params:
|
|
query_params = self.sanitize_for_serialization(query_params)
|
|
query_params = self.parameters_to_tuples(query_params, collection_formats)
|
|
|
|
# post parameters
|
|
post_params = post_params if post_params else []
|
|
if post_params or files:
|
|
post_params = self.sanitize_for_serialization(post_params)
|
|
post_params = self.parameters_to_tuples(post_params, collection_formats)
|
|
post_params.extend(self.files_parameters(files))
|
|
|
|
if header_params.get("Content-Type", "").startswith("multipart"):
|
|
if body:
|
|
post_params.extend(self._convert_body_to_post_params(body))
|
|
body = None
|
|
|
|
if post_params:
|
|
post_params = self.parameters_to_multipart(post_params, (dict))
|
|
else:
|
|
# body
|
|
if body:
|
|
body = self.sanitize_for_serialization(body)
|
|
|
|
# auth setting
|
|
self.update_params_for_auth(
|
|
header_params,
|
|
query_params,
|
|
auth_settings,
|
|
resource_path,
|
|
method,
|
|
body,
|
|
request_auths=_request_auths,
|
|
)
|
|
|
|
# request url
|
|
if _host is None:
|
|
url = self.configuration.host + resource_path
|
|
else:
|
|
# use server/host defined in path or operation instead
|
|
url = _host + resource_path
|
|
|
|
try:
|
|
# perform request and return response
|
|
response = self.request(
|
|
method,
|
|
url,
|
|
query_params=query_params,
|
|
headers=header_params,
|
|
post_params=post_params,
|
|
body=body,
|
|
_parse_response=_parse_response,
|
|
_request_timeout=_request_timeout,
|
|
_check_status=_check_status,
|
|
)
|
|
except ApiException as e:
|
|
e.body = e.body.decode("utf-8")
|
|
raise e
|
|
|
|
self.last_response = response
|
|
|
|
return_data = None
|
|
if _parse_response and response_schema:
|
|
return_data = self.deserialize(response, response_schema, _check_type=_check_type)
|
|
|
|
return (return_data, response)
|
|
|
|
def parameters_to_multipart(self, params, collection_types):
|
|
"""Get parameters as list of tuples, formatting as json if value is collection_types
|
|
|
|
:param params: Parameters as list of two-tuples
|
|
:param dict collection_types: Parameter collection types
|
|
:return: Parameters as list of tuple or urllib3.fields.RequestField
|
|
"""
|
|
new_params = []
|
|
if collection_types is None:
|
|
collection_types = dict
|
|
for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501
|
|
if isinstance(
|
|
v, collection_types
|
|
): # v is instance of collection_type, formatting as application/json
|
|
v = json.dumps(v, ensure_ascii=False).encode("utf-8")
|
|
field = RequestField(k, v)
|
|
field.make_multipart(content_type="application/json; charset=utf-8")
|
|
new_params.append(field)
|
|
else:
|
|
new_params.append((k, v))
|
|
return new_params
|
|
|
|
@classmethod
|
|
def sanitize_for_serialization(cls, obj, *, read_files: bool = True):
|
|
"""Prepares data for transmission before it is sent with the rest client
|
|
If obj is None, return None.
|
|
If obj is str, int, long, float, bool, return directly.
|
|
If obj is datetime.datetime, datetime.date
|
|
convert to string in iso8601 format.
|
|
If obj is list, sanitize each element in the list.
|
|
If obj is dict, return the dict.
|
|
If obj is OpenAPI model, return the properties dict.
|
|
If obj is io.IOBase, return the bytes
|
|
:param obj: The data to serialize.
|
|
:param read_files: Whether to read file data or leave files as is.
|
|
:return: The serialized form of data.
|
|
"""
|
|
if isinstance(obj, (ModelNormal, ModelComposed)):
|
|
return {
|
|
key: cls.sanitize_for_serialization(val, read_files=read_files)
|
|
for key, val in model_to_dict(obj, serialize=True).items()
|
|
}
|
|
elif isinstance(obj, io.IOBase):
|
|
if read_files:
|
|
return cls.get_file_data_and_close_file(obj)
|
|
else:
|
|
return obj
|
|
elif isinstance(obj, (str, int, float, none_type, bool)):
|
|
return obj
|
|
elif isinstance(obj, (datetime, date)):
|
|
return obj.isoformat()
|
|
elif isinstance(obj, ModelSimple):
|
|
return cls.sanitize_for_serialization(obj.value, read_files=read_files)
|
|
elif isinstance(obj, (list, tuple)):
|
|
return [cls.sanitize_for_serialization(item, read_files=read_files) for item in obj]
|
|
if isinstance(obj, dict):
|
|
return {
|
|
key: cls.sanitize_for_serialization(val, read_files=read_files)
|
|
for key, val in obj.items()
|
|
}
|
|
raise ApiValueError(
|
|
"Unable to prepare type {} for serialization".format(obj.__class__.__name__)
|
|
)
|
|
|
|
def deserialize(
|
|
self, response: HTTPResponse, response_schema: typing.Tuple, *, _check_type: bool
|
|
):
|
|
"""Deserializes response into an object.
|
|
|
|
:param response (urllib3.HTTPResponse): object to be deserialized.
|
|
:param response_schema: For the response, a tuple containing:
|
|
valid classes
|
|
a list containing valid classes (for list schemas)
|
|
a dict containing a tuple of valid classes as the value
|
|
Example values:
|
|
(str,)
|
|
(Pet,)
|
|
(float, none_type)
|
|
([int, none_type],)
|
|
({str: (bool, str, int, float, date, datetime, str, none_type)},)
|
|
:param _check_type (bool): whether to check the types of the data
|
|
received from the server
|
|
|
|
:return: deserialized object
|
|
"""
|
|
|
|
if response_schema == (file_type,):
|
|
# handle file downloading
|
|
# save response body into a tmp file and return the instance
|
|
content_disposition = response.getheader("Content-Disposition")
|
|
return deserialize_file(
|
|
response.data, self.configuration, content_disposition=content_disposition
|
|
)
|
|
|
|
encoding = "utf-8"
|
|
content_type = response.getheader("content-type")
|
|
if content_type is not None:
|
|
match = re.search(r"charset=([a-zA-Z\-\d]+)[\s\;]?", content_type)
|
|
if match:
|
|
encoding = match.group(1)
|
|
response_data = response.data.decode(encoding)
|
|
|
|
# fetch data from response object
|
|
try:
|
|
received_data = json.loads(response_data)
|
|
except ValueError:
|
|
received_data = response_data
|
|
|
|
# store our data under the key of 'received_data' so users have some
|
|
# context if they are deserializing a string and the data type is wrong
|
|
deserialized_data = validate_and_convert_types(
|
|
received_data,
|
|
response_schema,
|
|
["received_data"],
|
|
True,
|
|
_check_type,
|
|
configuration=self.configuration,
|
|
)
|
|
return deserialized_data
|
|
|
|
def call_api(
|
|
self,
|
|
resource_path: str,
|
|
method: str,
|
|
path_params: typing.Optional[typing.Dict[str, typing.Any]] = None,
|
|
query_params: typing.Optional[typing.List[typing.Tuple[str, typing.Any]]] = None,
|
|
header_params: typing.Optional[typing.Dict[str, typing.Any]] = None,
|
|
body: typing.Optional[typing.Any] = None,
|
|
post_params: typing.Optional[typing.List[typing.Tuple[str, typing.Any]]] = None,
|
|
files: typing.Optional[typing.Dict[str, typing.List[io.IOBase]]] = None,
|
|
response_schema: typing.Optional[typing.Tuple[typing.Any]] = None,
|
|
auth_settings: typing.Optional[typing.List[str]] = None,
|
|
collection_formats: typing.Optional[typing.Dict[str, str]] = None,
|
|
*,
|
|
_async_call: typing.Optional[bool] = None,
|
|
_parse_response: bool = True,
|
|
_request_timeout: typing.Optional[typing.Union[int, float, typing.Tuple]] = None,
|
|
_host: typing.Optional[str] = None,
|
|
_check_type: typing.Optional[bool] = None,
|
|
_request_auths: typing.Optional[typing.List[typing.Dict[str, typing.Any]]] = None,
|
|
_check_status: bool = True,
|
|
):
|
|
"""Makes the HTTP request (synchronous) and returns deserialized data.
|
|
|
|
To make an _async_call request, set the _async_call parameter.
|
|
|
|
:param resource_path: Path to method endpoint.
|
|
:param method: Method to call.
|
|
:param path_params: Path parameters in the url.
|
|
:param query_params: Query parameters in the url.
|
|
:param header_params: Header parameters to be
|
|
placed in the request header.
|
|
:param body: Request body.
|
|
:param post_params dict: Request post form parameters,
|
|
for `application/x-www-form-urlencoded`, `multipart/form-data`.
|
|
:param auth_settings list: Auth Settings names for the request.
|
|
:param response_schema: For the response, a tuple containing:
|
|
valid classes
|
|
a list containing valid classes (for list schemas)
|
|
a dict containing a tuple of valid classes as the value
|
|
Example values:
|
|
(str,)
|
|
(Pet,)
|
|
(float, none_type)
|
|
([int, none_type],)
|
|
({str: (bool, str, int, float, date, datetime, str, none_type)},)
|
|
:param files: key -> field name, value -> a list of open file
|
|
objects for `multipart/form-data`.
|
|
:type files: dict
|
|
:param _async_call bool: execute request asynchronously
|
|
:type _async_call: bool, optional
|
|
:param collection_formats: dict of collection formats for path, query,
|
|
header, and post parameters.
|
|
:type collection_formats: dict, optional
|
|
:param _parse_response: if False, the urllib3.HTTPResponse object will
|
|
be returned without reading/decoding response
|
|
data. Default is True.
|
|
:type _parse_response: bool, optional
|
|
:param _request_timeout: timeout setting for this request. If one
|
|
number provided, it will be total request
|
|
timeout. It can also be a pair (tuple) of
|
|
(connection, read) timeouts.
|
|
:param _check_type: boolean describing if the data back from the server
|
|
should have its type checked.
|
|
:type _check_type: bool, optional
|
|
:param _request_auths: set to override the auth_settings for an a single
|
|
request; this effectively ignores the authentication
|
|
in the spec for a single request.
|
|
:type _request_auths: list, optional
|
|
:return:
|
|
If _async_call parameter is True,
|
|
the request will be called asynchronously.
|
|
The method will return the request thread.
|
|
If parameter _async_call is False or missing,
|
|
then the method will return the response directly.
|
|
"""
|
|
params = {
|
|
"resource_path": resource_path,
|
|
"method": method,
|
|
"path_params": path_params,
|
|
"query_params": query_params,
|
|
"header_params": header_params,
|
|
"body": body,
|
|
"post_params": post_params,
|
|
"files": files,
|
|
"response_schema": response_schema,
|
|
"auth_settings": auth_settings,
|
|
"collection_formats": collection_formats,
|
|
"_parse_response": _parse_response,
|
|
"_request_timeout": _request_timeout,
|
|
"_host": _host,
|
|
"_check_type": _check_type,
|
|
"_request_auths": _request_auths,
|
|
"_check_status": _check_status,
|
|
}
|
|
|
|
if not _async_call:
|
|
return self.__call_api(**params)
|
|
|
|
return self.pool.apply_async(self.__call_api, (), kwds=params)
|
|
|
|
def request(
|
|
self,
|
|
method,
|
|
url,
|
|
query_params=None,
|
|
headers=None,
|
|
post_params=None,
|
|
body=None,
|
|
*,
|
|
_parse_response=True,
|
|
_request_timeout=None,
|
|
_check_status=True,
|
|
):
|
|
"""Makes the HTTP request using RESTClient."""
|
|
if method == "GET":
|
|
return self.rest_client.GET(
|
|
url,
|
|
query_params=query_params,
|
|
_parse_response=_parse_response,
|
|
_request_timeout=_request_timeout,
|
|
headers=headers,
|
|
_check_status=_check_status,
|
|
)
|
|
elif method == "HEAD":
|
|
return self.rest_client.HEAD(
|
|
url,
|
|
query_params=query_params,
|
|
_parse_response=_parse_response,
|
|
_request_timeout=_request_timeout,
|
|
headers=headers,
|
|
_check_status=_check_status,
|
|
)
|
|
elif method == "OPTIONS":
|
|
return self.rest_client.OPTIONS(
|
|
url,
|
|
query_params=query_params,
|
|
headers=headers,
|
|
post_params=post_params,
|
|
_parse_response=_parse_response,
|
|
_request_timeout=_request_timeout,
|
|
_check_status=_check_status,
|
|
body=body,
|
|
)
|
|
elif method == "POST":
|
|
return self.rest_client.POST(
|
|
url,
|
|
query_params=query_params,
|
|
headers=headers,
|
|
post_params=post_params,
|
|
_parse_response=_parse_response,
|
|
_request_timeout=_request_timeout,
|
|
_check_status=_check_status,
|
|
body=body,
|
|
)
|
|
elif method == "PUT":
|
|
return self.rest_client.PUT(
|
|
url,
|
|
query_params=query_params,
|
|
headers=headers,
|
|
post_params=post_params,
|
|
_parse_response=_parse_response,
|
|
_request_timeout=_request_timeout,
|
|
_check_status=_check_status,
|
|
body=body,
|
|
)
|
|
elif method == "PATCH":
|
|
return self.rest_client.PATCH(
|
|
url,
|
|
query_params=query_params,
|
|
headers=headers,
|
|
post_params=post_params,
|
|
_parse_response=_parse_response,
|
|
_request_timeout=_request_timeout,
|
|
_check_status=_check_status,
|
|
body=body,
|
|
)
|
|
elif method == "DELETE":
|
|
return self.rest_client.DELETE(
|
|
url,
|
|
query_params=query_params,
|
|
headers=headers,
|
|
_parse_response=_parse_response,
|
|
_request_timeout=_request_timeout,
|
|
_check_status=_check_status,
|
|
body=body,
|
|
)
|
|
else:
|
|
raise ApiValueError(
|
|
"http method must be `GET`, `HEAD`, `OPTIONS`,"
|
|
" `POST`, `PATCH`, `PUT` or `DELETE`."
|
|
)
|
|
|
|
def parameters_to_tuples(self, params, collection_formats):
|
|
"""Get parameters as list of tuples, formatting collections.
|
|
|
|
:param params: Parameters as dict or list of two-tuples
|
|
:param dict collection_formats: Parameter collection formats
|
|
:return: Parameters as list of tuples, collections formatted
|
|
"""
|
|
new_params = []
|
|
if collection_formats is None:
|
|
collection_formats = {}
|
|
for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501
|
|
if k in collection_formats:
|
|
collection_format = collection_formats[k]
|
|
if collection_format == "multi":
|
|
new_params.extend((k, value) for value in v)
|
|
else:
|
|
if collection_format == "ssv":
|
|
delimiter = " "
|
|
elif collection_format == "tsv":
|
|
delimiter = "\t"
|
|
elif collection_format == "pipes":
|
|
delimiter = "|"
|
|
else: # csv is the default
|
|
delimiter = ","
|
|
new_params.append((k, delimiter.join(str(value) for value in v)))
|
|
else:
|
|
new_params.append((k, v))
|
|
return new_params
|
|
|
|
@staticmethod
|
|
def get_file_data_and_close_file(file_instance: io.IOBase) -> bytes:
|
|
file_data = file_instance.read()
|
|
file_instance.close()
|
|
return file_data
|
|
|
|
def _serialize_file(
|
|
self, file_instance: io.IOBase
|
|
) -> typing.Tuple[str, typing.Union[str, bytes], str]:
|
|
if file_instance.closed is True:
|
|
raise ApiValueError("Cannot read a closed file.")
|
|
filename = os.path.basename(file_instance.name)
|
|
|
|
filedata = self.get_file_data_and_close_file(file_instance)
|
|
|
|
mimetype = mimetypes.guess_type(filename)[0] or "application/octet-stream"
|
|
|
|
return filename, filedata, mimetype
|
|
|
|
def files_parameters(
|
|
self, files: typing.Optional[typing.Dict[str, typing.List[io.IOBase]]] = None
|
|
):
|
|
"""Builds form parameters.
|
|
|
|
:param files: None or a dict with key=param_name and
|
|
value is a list of open file objects
|
|
:return: List of tuples of form parameters with file data
|
|
"""
|
|
if files is None:
|
|
return []
|
|
|
|
params = []
|
|
for param_name, file_instances in files.items():
|
|
if file_instances is None:
|
|
# if the file field is nullable, skip None values
|
|
continue
|
|
for file_instance in file_instances:
|
|
if file_instance is None:
|
|
# if the file field is nullable, skip None values
|
|
continue
|
|
|
|
try:
|
|
params.append((param_name, self._serialize_file(file_instance)))
|
|
except ApiValueError as e:
|
|
raise ApiValueError(
|
|
"The passed in file_type " "for %s must be open." % param_name
|
|
) from e
|
|
|
|
return params
|
|
|
|
def select_header_accept(self, accepts):
|
|
"""Returns `Accept` based on an array of accepts provided.
|
|
|
|
:param accepts: List of headers.
|
|
:return: Accept (e.g. application/json).
|
|
"""
|
|
if not accepts:
|
|
return
|
|
|
|
accepts = [x.lower() for x in accepts]
|
|
|
|
if "application/json" in accepts:
|
|
return "application/json"
|
|
else:
|
|
return ", ".join(accepts)
|
|
|
|
def select_header_content_type(self, content_types, method=None, body=None):
|
|
"""Returns `Content-Type` based on an array of content_types provided.
|
|
|
|
:param content_types: List of content-types.
|
|
:param method: http method (e.g. POST, PATCH).
|
|
:param body: http body to send.
|
|
:return: Content-Type (e.g. application/json).
|
|
"""
|
|
if not content_types:
|
|
return None
|
|
|
|
content_types = [x.lower() for x in content_types]
|
|
|
|
if (
|
|
method == "PATCH"
|
|
and "application/json-patch+json" in content_types
|
|
and isinstance(body, list)
|
|
):
|
|
return "application/json-patch+json"
|
|
|
|
if "application/json" in content_types or "*/*" in content_types:
|
|
return "application/json"
|
|
else:
|
|
return content_types[0]
|
|
|
|
def update_params_for_auth(
|
|
self, headers, queries, auth_settings, resource_path, method, body, request_auths=None
|
|
):
|
|
"""Updates header and query params based on authentication setting.
|
|
|
|
:param headers: Header parameters dict to be updated.
|
|
:param queries: Query parameters tuple list to be updated.
|
|
:param auth_settings: Authentication setting identifiers list.
|
|
:param resource_path: A string representation of the HTTP request resource path.
|
|
:param method: A string representation of the HTTP request method.
|
|
:param body: A object representing the body of the HTTP request.
|
|
The object type is the return value of _encoder.default().
|
|
:param request_auths: if set, the provided settings will
|
|
override the token in the configuration.
|
|
"""
|
|
if not auth_settings:
|
|
return
|
|
|
|
if request_auths:
|
|
for auth_setting in request_auths:
|
|
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
|
|
return
|
|
|
|
for auth in auth_settings:
|
|
auth_setting = self.configuration.auth_settings().get(auth)
|
|
if auth_setting:
|
|
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
|
|
|
|
def _apply_auth_params(self, headers, queries, resource_path, method, body, auth_setting):
|
|
if auth_setting["in"] == "cookie":
|
|
headers["Cookie"] = auth_setting["key"] + "=" + auth_setting["value"]
|
|
elif auth_setting["in"] == "header":
|
|
if auth_setting["type"] != "http-signature":
|
|
headers[auth_setting["key"]] = auth_setting["value"]
|
|
elif auth_setting["in"] == "query":
|
|
queries.append((auth_setting["key"], auth_setting["value"]))
|
|
else:
|
|
raise ApiValueError("Authentication token must be in `query` or `header`")
|
|
|
|
auth_api: "AuthApi"
|
|
cloud_storages_api: "CloudStoragesApi"
|
|
comments_api: "CommentsApi"
|
|
invitations_api: "InvitationsApi"
|
|
issues_api: "IssuesApi"
|
|
jobs_api: "JobsApi"
|
|
lambda_api: "LambdaApi"
|
|
memberships_api: "MembershipsApi"
|
|
organizations_api: "OrganizationsApi"
|
|
projects_api: "ProjectsApi"
|
|
restrictions_api: "RestrictionsApi"
|
|
schema_api: "SchemaApi"
|
|
server_api: "ServerApi"
|
|
tasks_api: "TasksApi"
|
|
users_api: "UsersApi"
|
|
|
|
_apis: typing.Dict[str, object] = {
|
|
"auth_api": [None, "AuthApi"],
|
|
"cloud_storages_api": [None, "CloudStoragesApi"],
|
|
"comments_api": [None, "CommentsApi"],
|
|
"invitations_api": [None, "InvitationsApi"],
|
|
"issues_api": [None, "IssuesApi"],
|
|
"jobs_api": [None, "JobsApi"],
|
|
"lambda_api": [None, "LambdaApi"],
|
|
"memberships_api": [None, "MembershipsApi"],
|
|
"organizations_api": [None, "OrganizationsApi"],
|
|
"projects_api": [None, "ProjectsApi"],
|
|
"restrictions_api": [None, "RestrictionsApi"],
|
|
"schema_api": [None, "SchemaApi"],
|
|
"server_api": [None, "ServerApi"],
|
|
"tasks_api": [None, "TasksApi"],
|
|
"users_api": [None, "UsersApi"],
|
|
}
|
|
|
|
def _make_api_instance(self, klass_name):
|
|
package = __name__.rsplit(".", maxsplit=1)[0]
|
|
module = importlib.import_module(package + ".apis")
|
|
api_klass = getattr(module, klass_name)
|
|
return api_klass(self)
|
|
|
|
def __getattr__(self, key):
|
|
notfound = object()
|
|
api_instance, api_klassname = self._apis.get(key, notfound)
|
|
if api_instance is notfound:
|
|
raise AttributeError(f"Can't find the '{key}' attribute")
|
|
|
|
if api_instance is None:
|
|
api_instance = self._make_api_instance(api_klassname)
|
|
setattr(self, key, api_instance)
|
|
|
|
return api_instance
|
|
|
|
|
|
class Endpoint(object):
|
|
def __init__(
|
|
self,
|
|
settings: typing.Optional[typing.Dict[str, typing.Any]] = None,
|
|
params_map: typing.Optional[typing.Dict[str, typing.Any]] = None,
|
|
root_map: typing.Optional[typing.Dict[str, typing.Any]] = None,
|
|
headers_map: typing.Optional[typing.Dict[str, typing.Any]] = None,
|
|
api_client: typing.Optional[ApiClient] = None,
|
|
):
|
|
"""Creates an endpoint
|
|
|
|
Args:
|
|
settings (dict): see below key value pairs
|
|
'response_schema' (tuple/None): response type
|
|
'auth' (list): a list of auth type keys
|
|
'endpoint_path' (str): the endpoint path
|
|
'operation_id' (str): endpoint string identifier
|
|
'http_method' (str): POST/PUT/PATCH/GET etc
|
|
'servers' (list): list of str servers that this endpoint is at
|
|
params_map (dict): see below key value pairs
|
|
'all' (list): list of str endpoint parameter names
|
|
'required' (list): list of required parameter names
|
|
'nullable' (list): list of nullable parameter names
|
|
'enum' (list): list of parameters with enum values
|
|
'validation' (list): list of parameters with validations
|
|
root_map (dict):
|
|
'validations' (dict): the dict mapping endpoint parameter tuple
|
|
paths to their validation dictionaries
|
|
'allowed_values' (dict): the dict mapping endpoint parameter
|
|
tuple paths to their allowed_values (enum) dictionaries
|
|
'openapi_types' (dict): param_name to openapi type
|
|
'attribute_map' (dict): param_name to camelCase name
|
|
'location_map' (dict): param_name to 'body', 'file', 'form',
|
|
'header', 'path', 'query'
|
|
collection_format_map (dict): param_name to `csv` etc.
|
|
headers_map (dict): see below key value pairs
|
|
'accept' (list): list of Accept header strings
|
|
'content_type' (list): list of Content-Type header strings
|
|
api_client (ApiClient) api client instance
|
|
"""
|
|
|
|
self.settings = settings
|
|
self.params_map = params_map
|
|
self.params_map["all"].extend(
|
|
[
|
|
"_async_call",
|
|
"_host_index",
|
|
"_parse_response",
|
|
"_request_timeout",
|
|
"_validate_inputs",
|
|
"_validate_outputs",
|
|
"_check_status",
|
|
"_content_type",
|
|
"_spec_property_naming",
|
|
"_request_auths",
|
|
]
|
|
)
|
|
self.params_map["nullable"].extend(["_request_timeout"])
|
|
self.validations = root_map["validations"]
|
|
self.allowed_values = root_map["allowed_values"]
|
|
self.openapi_types = root_map["openapi_types"]
|
|
extra_types = {
|
|
"_async_call": (bool,),
|
|
"_host_index": (none_type, int),
|
|
"_parse_response": (bool,),
|
|
"_request_timeout": (none_type, float, (float,), [float], int, (int,), [int]),
|
|
"_validate_inputs": (bool,),
|
|
"_validate_outputs": (bool,),
|
|
"_check_status": (bool,),
|
|
"_spec_property_naming": (bool,),
|
|
"_content_type": (none_type, str),
|
|
"_request_auths": (none_type, list),
|
|
}
|
|
self.openapi_types.update(extra_types)
|
|
self.attribute_map = root_map["attribute_map"]
|
|
self.location_map = root_map["location_map"]
|
|
self.collection_format_map = root_map["collection_format_map"]
|
|
self.headers_map = headers_map
|
|
self.api_client = api_client
|
|
|
|
def __validate_inputs(self, kwargs):
|
|
for param in self.params_map["enum"]:
|
|
if param in kwargs:
|
|
check_allowed_values(self.allowed_values, (param,), kwargs[param])
|
|
|
|
for param in self.params_map["validation"]:
|
|
if param in kwargs:
|
|
check_validations(
|
|
self.validations,
|
|
(param,),
|
|
kwargs[param],
|
|
configuration=self.api_client.configuration,
|
|
)
|
|
|
|
if kwargs["_validate_inputs"] is False:
|
|
return
|
|
|
|
for key, value in kwargs.items():
|
|
fixed_val = validate_and_convert_types(
|
|
value,
|
|
self.openapi_types[key],
|
|
[key],
|
|
kwargs["_spec_property_naming"],
|
|
kwargs["_validate_inputs"],
|
|
configuration=self.api_client.configuration,
|
|
)
|
|
kwargs[key] = fixed_val
|
|
|
|
def __gather_params(self, kwargs):
|
|
params = {
|
|
"body": None,
|
|
"collection_format": {},
|
|
"file": {},
|
|
"form": [],
|
|
"header": {},
|
|
"path": {},
|
|
"query": [],
|
|
}
|
|
|
|
for param_name, param_value in kwargs.items():
|
|
param_location = self.location_map.get(param_name)
|
|
if param_location is None:
|
|
continue
|
|
if param_location:
|
|
if param_location == "body":
|
|
params["body"] = param_value
|
|
continue
|
|
base_name = self.attribute_map[param_name]
|
|
if param_location == "form" and self.openapi_types[param_name] == (file_type,):
|
|
params["file"][base_name] = [param_value]
|
|
elif param_location == "form" and self.openapi_types[param_name] == ([file_type],):
|
|
# param_value is already a list
|
|
params["file"][base_name] = param_value
|
|
elif param_location in {"form", "query"}:
|
|
param_value_full = (base_name, param_value)
|
|
params[param_location].append(param_value_full)
|
|
if param_location not in {"form", "query"}:
|
|
params[param_location][base_name] = param_value
|
|
collection_format = self.collection_format_map.get(param_name)
|
|
if collection_format:
|
|
params["collection_format"][base_name] = collection_format
|
|
|
|
return params
|
|
|
|
def call_with_http_info(
|
|
self, **kwargs
|
|
) -> typing.Tuple[typing.Optional[typing.Any], HTTPResponse]:
|
|
"""
|
|
Keyword Args:
|
|
endpoint args
|
|
_preload_content (bool): if False, the urllib3.HTTPResponse object
|
|
will be returned without reading/decoding response data.
|
|
Default is True.
|
|
_request_timeout (int/float/tuple): timeout setting for this request. If
|
|
one number provided, it will be total request timeout. It can also
|
|
be a pair (tuple) of (connection, read) timeouts.
|
|
Default is None.
|
|
_validate_inputs (bool): specifies if type checking
|
|
should be done one the data sent to the server.
|
|
Default is True.
|
|
_validate_outputs (bool): specifies if type checking
|
|
should be done one the data received from the server.
|
|
Default is True.
|
|
_spec_property_naming (bool): True if the variable names in the input data
|
|
are serialized names, as specified in the OpenAPI document.
|
|
False if the variable names in the input data
|
|
are pythonic names, e.g. snake case (default)
|
|
_content_type (str/None): force body content-type.
|
|
Default is None and content-type will be predicted by allowed
|
|
content-types and body.
|
|
_host_index (int/None): specifies the index of the server
|
|
that we want to use.
|
|
Default is read from the configuration.
|
|
_request_auths (list): set to override the auth_settings for an a single
|
|
request; this effectively ignores the authentication
|
|
in the spec for a single request.
|
|
Default is None
|
|
_async_call (bool): execute request asynchronously
|
|
|
|
Returns:
|
|
(parsed_model, response)
|
|
If the method is called asynchronously, returns the request
|
|
thread.
|
|
"""
|
|
|
|
try:
|
|
index = (
|
|
self.api_client.configuration.server_operation_index.get(
|
|
self.settings["operation_id"], self.api_client.configuration.server_index
|
|
)
|
|
if kwargs["_host_index"] is None
|
|
else kwargs["_host_index"]
|
|
)
|
|
server_variables = self.api_client.configuration.server_operation_variables.get(
|
|
self.settings["operation_id"], self.api_client.configuration.server_variables
|
|
)
|
|
_host = self.api_client.configuration.get_host_from_settings(
|
|
index, variables=server_variables, servers=self.settings["servers"]
|
|
)
|
|
except IndexError:
|
|
if self.settings["servers"]:
|
|
raise ApiValueError(
|
|
"Invalid host index. Must be 0 <= index < %s" % len(self.settings["servers"])
|
|
)
|
|
_host = None
|
|
|
|
for key, value in kwargs.items():
|
|
if key not in self.params_map["all"]:
|
|
raise ApiTypeError(
|
|
"Got an unexpected parameter '%s'"
|
|
" to method `%s`" % (key, self.settings["operation_id"])
|
|
)
|
|
# only throw this nullable ApiValueError if _validate_inputs
|
|
# is False, if _validate_inputs==True we catch this case
|
|
# in self.__validate_inputs
|
|
if (
|
|
key not in self.params_map["nullable"]
|
|
and value is None
|
|
and kwargs["_validate_inputs"] is False
|
|
):
|
|
raise ApiValueError(
|
|
"Value may not be None for non-nullable parameter `%s`"
|
|
" when calling `%s`" % (key, self.settings["operation_id"])
|
|
)
|
|
|
|
for key in self.params_map["required"]:
|
|
if key not in kwargs.keys():
|
|
raise ApiValueError(
|
|
"Missing the required parameter `%s` when calling "
|
|
"`%s`" % (key, self.settings["operation_id"])
|
|
)
|
|
|
|
self.__validate_inputs(kwargs)
|
|
|
|
params = self.__gather_params(kwargs)
|
|
|
|
accept_headers_list = self.headers_map["accept"]
|
|
if accept_headers_list:
|
|
params["header"]["Accept"] = self.api_client.select_header_accept(accept_headers_list)
|
|
|
|
if kwargs.get("_content_type"):
|
|
params["header"]["Content-Type"] = kwargs["_content_type"]
|
|
else:
|
|
content_type_headers_list = self.headers_map["content_type"]
|
|
if content_type_headers_list:
|
|
if params["body"] != "":
|
|
content_types_list = self.api_client.select_header_content_type(
|
|
content_type_headers_list, self.settings["http_method"], params["body"]
|
|
)
|
|
if content_types_list:
|
|
params["header"]["Content-Type"] = content_types_list
|
|
|
|
return self.api_client.call_api(
|
|
self.settings["endpoint_path"],
|
|
self.settings["http_method"],
|
|
params["path"],
|
|
params["query"],
|
|
params["header"],
|
|
body=params["body"],
|
|
post_params=params["form"],
|
|
files=params["file"],
|
|
response_schema=self.settings["response_schema"],
|
|
auth_settings=self.settings["auth"],
|
|
_async_call=kwargs["_async_call"],
|
|
_check_type=kwargs["_validate_outputs"],
|
|
_check_status=kwargs["_check_status"],
|
|
_parse_response=kwargs["_parse_response"],
|
|
_request_timeout=kwargs["_request_timeout"],
|
|
_host=_host,
|
|
_request_auths=kwargs["_request_auths"],
|
|
collection_formats=params["collection_format"],
|
|
)
|