You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1081 lines
42 KiB
Python

# Copyright (C) 2022 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT
# CVAT REST API
#
# REST API for Computer Vision Annotation Tool (CVAT) # noqa: E501
#
# The version of the OpenAPI document: alpha (2.0)
# Contact: support@cvat.ai
# Generated by: https://openapi-generator.tech
from __future__ import annotations
import atexit
import importlib
import io
import json
import mimetypes
import os
import re
import typing
from multiprocessing.pool import ThreadPool
from typing import TYPE_CHECKING
from urllib.parse import quote
from urllib3 import HTTPResponse
from urllib3.fields import RequestField
from cvat_sdk import rest
from cvat_sdk.configuration import Configuration
from cvat_sdk.exceptions import ApiException, ApiTypeError, ApiValueError
from cvat_sdk.model_utils import (
ModelComposed,
ModelNormal,
ModelSimple,
check_allowed_values,
check_validations,
date,
datetime,
deserialize_file,
file_type,
model_to_dict,
none_type,
validate_and_convert_types,
)
if TYPE_CHECKING:
# Enable introspection. Can't work normally due to cyclic imports
from cvat_sdk.apis import *
from cvat_sdk.models import *
class ApiClient(object):
"""Generic API client for OpenAPI client library builds.
OpenAPI generic API client. This client handles the client-
server communication, and is invariant across implementations. Specifics of
the methods and models for each application are generated from the OpenAPI
templates.
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Class members:
auth_api: AuthApi
cloud_storages_api: CloudStoragesApi
comments_api: CommentsApi
invitations_api: InvitationsApi
issues_api: IssuesApi
jobs_api: JobsApi
lambda_api: LambdaApi
memberships_api: MembershipsApi
organizations_api: OrganizationsApi
projects_api: ProjectsApi
restrictions_api: RestrictionsApi
schema_api: SchemaApi
server_api: ServerApi
tasks_api: TasksApi
users_api: UsersApi
"""
_pool = None
def __init__(
self, configuration=None, header_name=None, header_value=None, cookie=None, pool_threads=1
):
"""
:param configuration: configuration object for this client
:param header_name: a header to pass when making calls to the API.
:param header_value: a header value to pass when making calls to
the API.
:param cookie: a cookie to include in the header when making calls
to the API
:param pool_threads: The number of threads to use for async requests
to the API. More threads means more concurrent API requests.
"""
if configuration is None:
configuration = Configuration.get_default_copy()
self.configuration = configuration
self.pool_threads = pool_threads
self.rest_client = rest.RESTClientObject(configuration)
self.default_headers = {}
if header_name is not None:
self.default_headers[header_name] = header_value
self.cookie = cookie
# Set default User-Agent.
self.user_agent = "OpenAPI-Generator/2.0-alpha/python"
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def close(self):
if self._pool:
self._pool.close()
self._pool.join()
self._pool = None
if hasattr(atexit, "unregister"):
atexit.unregister(self.close)
@property
def pool(self):
"""Create thread pool on first request
avoids instantiating unused threadpool for blocking clients.
"""
if self._pool is None:
atexit.register(self.close)
self._pool = ThreadPool(self.pool_threads)
return self._pool
@property
def user_agent(self):
"""User agent for this API client"""
return self.default_headers["User-Agent"]
@user_agent.setter
def user_agent(self, value):
self.default_headers["User-Agent"] = value
def _serialize_post_parameter(self, obj):
if isinstance(obj, (str, int, float, none_type, bool)):
return ("", json.dumps(obj), "application/json")
elif isinstance(obj, io.IOBase):
return self._serialize_file(obj)
raise ApiValueError(
"Unable to prepare type {} for serialization".format(obj.__class__.__name__)
)
def _convert_body_to_post_params(self, body):
# body must be a flat structure, lists of primitives is possible
body = self.sanitize_for_serialization(body, read_files=False)
assert isinstance(body, dict), type(body)
post_params = []
for k, v in body.items():
if isinstance(v, (tuple, list)):
for i, entry in enumerate(v):
post_params.append((f"{k}[{i}]", self._serialize_post_parameter(entry)))
else:
post_params.append((k, self._serialize_post_parameter(v)))
return post_params
def set_default_header(self, header_name, header_value):
self.default_headers[header_name] = header_value
def __call_api(
self,
resource_path: str,
method: str,
path_params: typing.Optional[typing.Dict[str, typing.Any]] = None,
query_params: typing.Optional[typing.List[typing.Tuple[str, typing.Any]]] = None,
header_params: typing.Optional[typing.Dict[str, typing.Any]] = None,
body: typing.Optional[typing.Any] = None,
post_params: typing.Optional[typing.List[typing.Tuple[str, typing.Any]]] = None,
files: typing.Optional[typing.Dict[str, typing.List[io.IOBase]]] = None,
response_schema: typing.Optional[typing.Tuple[typing.Any]] = None,
auth_settings: typing.Optional[typing.List[str]] = None,
collection_formats: typing.Optional[typing.Dict[str, str]] = None,
*,
_parse_response: bool = True,
_request_timeout: typing.Optional[typing.Union[int, float, typing.Tuple]] = None,
_host: typing.Optional[str] = None,
_check_type: typing.Optional[bool] = None,
_check_status: bool = True,
_request_auths: typing.Optional[typing.List[typing.Dict[str, typing.Any]]] = None,
):
config = self.configuration
# header parameters
header_params = header_params or {}
header_params.update(self.default_headers)
if self.cookie:
header_params["Cookie"] = self.cookie
if header_params:
header_params = self.sanitize_for_serialization(header_params)
header_params = dict(self.parameters_to_tuples(header_params, collection_formats))
# path parameters
if path_params:
path_params = self.sanitize_for_serialization(path_params)
path_params = self.parameters_to_tuples(path_params, collection_formats)
for k, v in path_params:
# specified safe chars, encode everything
resource_path = resource_path.replace(
"{%s}" % k, quote(str(v), safe=config.safe_chars_for_path_param)
)
# query parameters
if query_params:
query_params = self.sanitize_for_serialization(query_params)
query_params = self.parameters_to_tuples(query_params, collection_formats)
# post parameters
post_params = post_params if post_params else []
if post_params or files:
post_params = self.sanitize_for_serialization(post_params)
post_params = self.parameters_to_tuples(post_params, collection_formats)
post_params.extend(self.files_parameters(files))
if header_params.get("Content-Type", "").startswith("multipart"):
if body:
post_params.extend(self._convert_body_to_post_params(body))
body = None
if post_params:
post_params = self.parameters_to_multipart(post_params, (dict))
else:
# body
if body:
body = self.sanitize_for_serialization(body)
# auth setting
self.update_params_for_auth(
header_params,
query_params,
auth_settings,
resource_path,
method,
body,
request_auths=_request_auths,
)
# request url
if _host is None:
url = self.configuration.host + resource_path
else:
# use server/host defined in path or operation instead
url = _host + resource_path
try:
# perform request and return response
response = self.request(
method,
url,
query_params=query_params,
headers=header_params,
post_params=post_params,
body=body,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
_check_status=_check_status,
)
except ApiException as e:
e.body = e.body.decode("utf-8")
raise e
self.last_response = response
return_data = None
if _parse_response and response_schema:
return_data = self.deserialize(response, response_schema, _check_type=_check_type)
return (return_data, response)
def parameters_to_multipart(self, params, collection_types):
"""Get parameters as list of tuples, formatting as json if value is collection_types
:param params: Parameters as list of two-tuples
:param dict collection_types: Parameter collection types
:return: Parameters as list of tuple or urllib3.fields.RequestField
"""
new_params = []
if collection_types is None:
collection_types = dict
for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501
if isinstance(
v, collection_types
): # v is instance of collection_type, formatting as application/json
v = json.dumps(v, ensure_ascii=False).encode("utf-8")
field = RequestField(k, v)
field.make_multipart(content_type="application/json; charset=utf-8")
new_params.append(field)
else:
new_params.append((k, v))
return new_params
@classmethod
def sanitize_for_serialization(cls, obj, *, read_files: bool = True):
"""Prepares data for transmission before it is sent with the rest client
If obj is None, return None.
If obj is str, int, long, float, bool, return directly.
If obj is datetime.datetime, datetime.date
convert to string in iso8601 format.
If obj is list, sanitize each element in the list.
If obj is dict, return the dict.
If obj is OpenAPI model, return the properties dict.
If obj is io.IOBase, return the bytes
:param obj: The data to serialize.
:param read_files: Whether to read file data or leave files as is.
:return: The serialized form of data.
"""
if isinstance(obj, (ModelNormal, ModelComposed)):
return {
key: cls.sanitize_for_serialization(val, read_files=read_files)
for key, val in model_to_dict(obj, serialize=True).items()
}
elif isinstance(obj, io.IOBase):
if read_files:
return cls.get_file_data_and_close_file(obj)
else:
return obj
elif isinstance(obj, (str, int, float, none_type, bool)):
return obj
elif isinstance(obj, (datetime, date)):
return obj.isoformat()
elif isinstance(obj, ModelSimple):
return cls.sanitize_for_serialization(obj.value, read_files=read_files)
elif isinstance(obj, (list, tuple)):
return [cls.sanitize_for_serialization(item, read_files=read_files) for item in obj]
if isinstance(obj, dict):
return {
key: cls.sanitize_for_serialization(val, read_files=read_files)
for key, val in obj.items()
}
raise ApiValueError(
"Unable to prepare type {} for serialization".format(obj.__class__.__name__)
)
def deserialize(
self, response: HTTPResponse, response_schema: typing.Tuple, *, _check_type: bool
):
"""Deserializes response into an object.
:param response (urllib3.HTTPResponse): object to be deserialized.
:param response_schema: For the response, a tuple containing:
valid classes
a list containing valid classes (for list schemas)
a dict containing a tuple of valid classes as the value
Example values:
(str,)
(Pet,)
(float, none_type)
([int, none_type],)
({str: (bool, str, int, float, date, datetime, str, none_type)},)
:param _check_type (bool): whether to check the types of the data
received from the server
:return: deserialized object
"""
if response_schema == (file_type,):
# handle file downloading
# save response body into a tmp file and return the instance
content_disposition = response.getheader("Content-Disposition")
return deserialize_file(
response.data, self.configuration, content_disposition=content_disposition
)
encoding = "utf-8"
content_type = response.getheader("content-type")
if content_type is not None:
match = re.search(r"charset=([a-zA-Z\-\d]+)[\s\;]?", content_type)
if match:
encoding = match.group(1)
response_data = response.data.decode(encoding)
# fetch data from response object
try:
received_data = json.loads(response_data)
except ValueError:
received_data = response_data
# store our data under the key of 'received_data' so users have some
# context if they are deserializing a string and the data type is wrong
deserialized_data = validate_and_convert_types(
received_data,
response_schema,
["received_data"],
True,
_check_type,
configuration=self.configuration,
)
return deserialized_data
def call_api(
self,
resource_path: str,
method: str,
path_params: typing.Optional[typing.Dict[str, typing.Any]] = None,
query_params: typing.Optional[typing.List[typing.Tuple[str, typing.Any]]] = None,
header_params: typing.Optional[typing.Dict[str, typing.Any]] = None,
body: typing.Optional[typing.Any] = None,
post_params: typing.Optional[typing.List[typing.Tuple[str, typing.Any]]] = None,
files: typing.Optional[typing.Dict[str, typing.List[io.IOBase]]] = None,
response_schema: typing.Optional[typing.Tuple[typing.Any]] = None,
auth_settings: typing.Optional[typing.List[str]] = None,
collection_formats: typing.Optional[typing.Dict[str, str]] = None,
*,
_async_call: typing.Optional[bool] = None,
_parse_response: bool = True,
_request_timeout: typing.Optional[typing.Union[int, float, typing.Tuple]] = None,
_host: typing.Optional[str] = None,
_check_type: typing.Optional[bool] = None,
_request_auths: typing.Optional[typing.List[typing.Dict[str, typing.Any]]] = None,
_check_status: bool = True,
):
"""Makes the HTTP request (synchronous) and returns deserialized data.
To make an _async_call request, set the _async_call parameter.
:param resource_path: Path to method endpoint.
:param method: Method to call.
:param path_params: Path parameters in the url.
:param query_params: Query parameters in the url.
:param header_params: Header parameters to be
placed in the request header.
:param body: Request body.
:param post_params dict: Request post form parameters,
for `application/x-www-form-urlencoded`, `multipart/form-data`.
:param auth_settings list: Auth Settings names for the request.
:param response_schema: For the response, a tuple containing:
valid classes
a list containing valid classes (for list schemas)
a dict containing a tuple of valid classes as the value
Example values:
(str,)
(Pet,)
(float, none_type)
([int, none_type],)
({str: (bool, str, int, float, date, datetime, str, none_type)},)
:param files: key -> field name, value -> a list of open file
objects for `multipart/form-data`.
:type files: dict
:param _async_call bool: execute request asynchronously
:type _async_call: bool, optional
:param collection_formats: dict of collection formats for path, query,
header, and post parameters.
:type collection_formats: dict, optional
:param _parse_response: if False, the urllib3.HTTPResponse object will
be returned without reading/decoding response
data. Default is True.
:type _parse_response: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _check_type: boolean describing if the data back from the server
should have its type checked.
:type _check_type: bool, optional
:param _request_auths: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auths: list, optional
:return:
If _async_call parameter is True,
the request will be called asynchronously.
The method will return the request thread.
If parameter _async_call is False or missing,
then the method will return the response directly.
"""
params = {
"resource_path": resource_path,
"method": method,
"path_params": path_params,
"query_params": query_params,
"header_params": header_params,
"body": body,
"post_params": post_params,
"files": files,
"response_schema": response_schema,
"auth_settings": auth_settings,
"collection_formats": collection_formats,
"_parse_response": _parse_response,
"_request_timeout": _request_timeout,
"_host": _host,
"_check_type": _check_type,
"_request_auths": _request_auths,
"_check_status": _check_status,
}
if not _async_call:
return self.__call_api(**params)
return self.pool.apply_async(self.__call_api, (), kwds=params)
def request(
self,
method,
url,
query_params=None,
headers=None,
post_params=None,
body=None,
*,
_parse_response=True,
_request_timeout=None,
_check_status=True,
):
"""Makes the HTTP request using RESTClient."""
if method == "GET":
return self.rest_client.GET(
url,
query_params=query_params,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
headers=headers,
_check_status=_check_status,
)
elif method == "HEAD":
return self.rest_client.HEAD(
url,
query_params=query_params,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
headers=headers,
_check_status=_check_status,
)
elif method == "OPTIONS":
return self.rest_client.OPTIONS(
url,
query_params=query_params,
headers=headers,
post_params=post_params,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
_check_status=_check_status,
body=body,
)
elif method == "POST":
return self.rest_client.POST(
url,
query_params=query_params,
headers=headers,
post_params=post_params,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
_check_status=_check_status,
body=body,
)
elif method == "PUT":
return self.rest_client.PUT(
url,
query_params=query_params,
headers=headers,
post_params=post_params,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
_check_status=_check_status,
body=body,
)
elif method == "PATCH":
return self.rest_client.PATCH(
url,
query_params=query_params,
headers=headers,
post_params=post_params,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
_check_status=_check_status,
body=body,
)
elif method == "DELETE":
return self.rest_client.DELETE(
url,
query_params=query_params,
headers=headers,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
_check_status=_check_status,
body=body,
)
else:
raise ApiValueError(
"http method must be `GET`, `HEAD`, `OPTIONS`,"
" `POST`, `PATCH`, `PUT` or `DELETE`."
)
def parameters_to_tuples(self, params, collection_formats):
"""Get parameters as list of tuples, formatting collections.
:param params: Parameters as dict or list of two-tuples
:param dict collection_formats: Parameter collection formats
:return: Parameters as list of tuples, collections formatted
"""
new_params = []
if collection_formats is None:
collection_formats = {}
for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501
if k in collection_formats:
collection_format = collection_formats[k]
if collection_format == "multi":
new_params.extend((k, value) for value in v)
else:
if collection_format == "ssv":
delimiter = " "
elif collection_format == "tsv":
delimiter = "\t"
elif collection_format == "pipes":
delimiter = "|"
else: # csv is the default
delimiter = ","
new_params.append((k, delimiter.join(str(value) for value in v)))
else:
new_params.append((k, v))
return new_params
@staticmethod
def get_file_data_and_close_file(file_instance: io.IOBase) -> bytes:
file_data = file_instance.read()
file_instance.close()
return file_data
def _serialize_file(
self, file_instance: io.IOBase
) -> typing.Tuple[str, typing.Union[str, bytes], str]:
if file_instance.closed is True:
raise ApiValueError("Cannot read a closed file.")
filename = os.path.basename(file_instance.name)
filedata = self.get_file_data_and_close_file(file_instance)
mimetype = mimetypes.guess_type(filename)[0] or "application/octet-stream"
return filename, filedata, mimetype
def files_parameters(
self, files: typing.Optional[typing.Dict[str, typing.List[io.IOBase]]] = None
):
"""Builds form parameters.
:param files: None or a dict with key=param_name and
value is a list of open file objects
:return: List of tuples of form parameters with file data
"""
if files is None:
return []
params = []
for param_name, file_instances in files.items():
if file_instances is None:
# if the file field is nullable, skip None values
continue
for file_instance in file_instances:
if file_instance is None:
# if the file field is nullable, skip None values
continue
try:
params.append((param_name, self._serialize_file(file_instance)))
except ApiValueError as e:
raise ApiValueError(
"The passed in file_type " "for %s must be open." % param_name
) from e
return params
def select_header_accept(self, accepts):
"""Returns `Accept` based on an array of accepts provided.
:param accepts: List of headers.
:return: Accept (e.g. application/json).
"""
if not accepts:
return
accepts = [x.lower() for x in accepts]
if "application/json" in accepts:
return "application/json"
else:
return ", ".join(accepts)
def select_header_content_type(self, content_types, method=None, body=None):
"""Returns `Content-Type` based on an array of content_types provided.
:param content_types: List of content-types.
:param method: http method (e.g. POST, PATCH).
:param body: http body to send.
:return: Content-Type (e.g. application/json).
"""
if not content_types:
return None
content_types = [x.lower() for x in content_types]
if (
method == "PATCH"
and "application/json-patch+json" in content_types
and isinstance(body, list)
):
return "application/json-patch+json"
if "application/json" in content_types or "*/*" in content_types:
return "application/json"
else:
return content_types[0]
def update_params_for_auth(
self, headers, queries, auth_settings, resource_path, method, body, request_auths=None
):
"""Updates header and query params based on authentication setting.
:param headers: Header parameters dict to be updated.
:param queries: Query parameters tuple list to be updated.
:param auth_settings: Authentication setting identifiers list.
:param resource_path: A string representation of the HTTP request resource path.
:param method: A string representation of the HTTP request method.
:param body: A object representing the body of the HTTP request.
The object type is the return value of _encoder.default().
:param request_auths: if set, the provided settings will
override the token in the configuration.
"""
if not auth_settings:
return
if request_auths:
for auth_setting in request_auths:
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
return
for auth in auth_settings:
auth_setting = self.configuration.auth_settings().get(auth)
if auth_setting:
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
def _apply_auth_params(self, headers, queries, resource_path, method, body, auth_setting):
if auth_setting["in"] == "cookie":
headers["Cookie"] = auth_setting["key"] + "=" + auth_setting["value"]
elif auth_setting["in"] == "header":
if auth_setting["type"] != "http-signature":
headers[auth_setting["key"]] = auth_setting["value"]
elif auth_setting["in"] == "query":
queries.append((auth_setting["key"], auth_setting["value"]))
else:
raise ApiValueError("Authentication token must be in `query` or `header`")
auth_api: "AuthApi"
cloud_storages_api: "CloudStoragesApi"
comments_api: "CommentsApi"
invitations_api: "InvitationsApi"
issues_api: "IssuesApi"
jobs_api: "JobsApi"
lambda_api: "LambdaApi"
memberships_api: "MembershipsApi"
organizations_api: "OrganizationsApi"
projects_api: "ProjectsApi"
restrictions_api: "RestrictionsApi"
schema_api: "SchemaApi"
server_api: "ServerApi"
tasks_api: "TasksApi"
users_api: "UsersApi"
_apis: typing.Dict[str, object] = {
"auth_api": [None, "AuthApi"],
"cloud_storages_api": [None, "CloudStoragesApi"],
"comments_api": [None, "CommentsApi"],
"invitations_api": [None, "InvitationsApi"],
"issues_api": [None, "IssuesApi"],
"jobs_api": [None, "JobsApi"],
"lambda_api": [None, "LambdaApi"],
"memberships_api": [None, "MembershipsApi"],
"organizations_api": [None, "OrganizationsApi"],
"projects_api": [None, "ProjectsApi"],
"restrictions_api": [None, "RestrictionsApi"],
"schema_api": [None, "SchemaApi"],
"server_api": [None, "ServerApi"],
"tasks_api": [None, "TasksApi"],
"users_api": [None, "UsersApi"],
}
def _make_api_instance(self, klass_name):
package = __name__.rsplit(".", maxsplit=1)[0]
module = importlib.import_module(package + ".apis")
api_klass = getattr(module, klass_name)
return api_klass(self)
def __getattr__(self, key):
notfound = object()
api_instance, api_klassname = self._apis.get(key, notfound)
if api_instance is notfound:
raise AttributeError(f"Can't find the '{key}' attribute")
if api_instance is None:
api_instance = self._make_api_instance(api_klassname)
setattr(self, key, api_instance)
return api_instance
class Endpoint(object):
def __init__(
self,
settings: typing.Optional[typing.Dict[str, typing.Any]] = None,
params_map: typing.Optional[typing.Dict[str, typing.Any]] = None,
root_map: typing.Optional[typing.Dict[str, typing.Any]] = None,
headers_map: typing.Optional[typing.Dict[str, typing.Any]] = None,
api_client: typing.Optional[ApiClient] = None,
):
"""Creates an endpoint
Args:
settings (dict): see below key value pairs
'response_schema' (tuple/None): response type
'auth' (list): a list of auth type keys
'endpoint_path' (str): the endpoint path
'operation_id' (str): endpoint string identifier
'http_method' (str): POST/PUT/PATCH/GET etc
'servers' (list): list of str servers that this endpoint is at
params_map (dict): see below key value pairs
'all' (list): list of str endpoint parameter names
'required' (list): list of required parameter names
'nullable' (list): list of nullable parameter names
'enum' (list): list of parameters with enum values
'validation' (list): list of parameters with validations
root_map (dict):
'validations' (dict): the dict mapping endpoint parameter tuple
paths to their validation dictionaries
'allowed_values' (dict): the dict mapping endpoint parameter
tuple paths to their allowed_values (enum) dictionaries
'openapi_types' (dict): param_name to openapi type
'attribute_map' (dict): param_name to camelCase name
'location_map' (dict): param_name to 'body', 'file', 'form',
'header', 'path', 'query'
collection_format_map (dict): param_name to `csv` etc.
headers_map (dict): see below key value pairs
'accept' (list): list of Accept header strings
'content_type' (list): list of Content-Type header strings
api_client (ApiClient) api client instance
"""
self.settings = settings
self.params_map = params_map
self.params_map["all"].extend(
[
"_async_call",
"_host_index",
"_parse_response",
"_request_timeout",
"_validate_inputs",
"_validate_outputs",
"_check_status",
"_content_type",
"_spec_property_naming",
"_request_auths",
]
)
self.params_map["nullable"].extend(["_request_timeout"])
self.validations = root_map["validations"]
self.allowed_values = root_map["allowed_values"]
self.openapi_types = root_map["openapi_types"]
extra_types = {
"_async_call": (bool,),
"_host_index": (none_type, int),
"_parse_response": (bool,),
"_request_timeout": (none_type, float, (float,), [float], int, (int,), [int]),
"_validate_inputs": (bool,),
"_validate_outputs": (bool,),
"_check_status": (bool,),
"_spec_property_naming": (bool,),
"_content_type": (none_type, str),
"_request_auths": (none_type, list),
}
self.openapi_types.update(extra_types)
self.attribute_map = root_map["attribute_map"]
self.location_map = root_map["location_map"]
self.collection_format_map = root_map["collection_format_map"]
self.headers_map = headers_map
self.api_client = api_client
def __validate_inputs(self, kwargs):
for param in self.params_map["enum"]:
if param in kwargs:
check_allowed_values(self.allowed_values, (param,), kwargs[param])
for param in self.params_map["validation"]:
if param in kwargs:
check_validations(
self.validations,
(param,),
kwargs[param],
configuration=self.api_client.configuration,
)
if kwargs["_validate_inputs"] is False:
return
for key, value in kwargs.items():
fixed_val = validate_and_convert_types(
value,
self.openapi_types[key],
[key],
kwargs["_spec_property_naming"],
kwargs["_validate_inputs"],
configuration=self.api_client.configuration,
)
kwargs[key] = fixed_val
def __gather_params(self, kwargs):
params = {
"body": None,
"collection_format": {},
"file": {},
"form": [],
"header": {},
"path": {},
"query": [],
}
for param_name, param_value in kwargs.items():
param_location = self.location_map.get(param_name)
if param_location is None:
continue
if param_location:
if param_location == "body":
params["body"] = param_value
continue
base_name = self.attribute_map[param_name]
if param_location == "form" and self.openapi_types[param_name] == (file_type,):
params["file"][base_name] = [param_value]
elif param_location == "form" and self.openapi_types[param_name] == ([file_type],):
# param_value is already a list
params["file"][base_name] = param_value
elif param_location in {"form", "query"}:
param_value_full = (base_name, param_value)
params[param_location].append(param_value_full)
if param_location not in {"form", "query"}:
params[param_location][base_name] = param_value
collection_format = self.collection_format_map.get(param_name)
if collection_format:
params["collection_format"][base_name] = collection_format
return params
def call_with_http_info(
self, **kwargs
) -> typing.Tuple[typing.Optional[typing.Any], HTTPResponse]:
"""
Keyword Args:
endpoint args
_preload_content (bool): if False, the urllib3.HTTPResponse object
will be returned without reading/decoding response data.
Default is True.
_request_timeout (int/float/tuple): timeout setting for this request. If
one number provided, it will be total request timeout. It can also
be a pair (tuple) of (connection, read) timeouts.
Default is None.
_validate_inputs (bool): specifies if type checking
should be done one the data sent to the server.
Default is True.
_validate_outputs (bool): specifies if type checking
should be done one the data received from the server.
Default is True.
_spec_property_naming (bool): True if the variable names in the input data
are serialized names, as specified in the OpenAPI document.
False if the variable names in the input data
are pythonic names, e.g. snake case (default)
_content_type (str/None): force body content-type.
Default is None and content-type will be predicted by allowed
content-types and body.
_host_index (int/None): specifies the index of the server
that we want to use.
Default is read from the configuration.
_request_auths (list): set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
Default is None
_async_call (bool): execute request asynchronously
Returns:
(parsed_model, response)
If the method is called asynchronously, returns the request
thread.
"""
try:
index = (
self.api_client.configuration.server_operation_index.get(
self.settings["operation_id"], self.api_client.configuration.server_index
)
if kwargs["_host_index"] is None
else kwargs["_host_index"]
)
server_variables = self.api_client.configuration.server_operation_variables.get(
self.settings["operation_id"], self.api_client.configuration.server_variables
)
_host = self.api_client.configuration.get_host_from_settings(
index, variables=server_variables, servers=self.settings["servers"]
)
except IndexError:
if self.settings["servers"]:
raise ApiValueError(
"Invalid host index. Must be 0 <= index < %s" % len(self.settings["servers"])
)
_host = None
for key, value in kwargs.items():
if key not in self.params_map["all"]:
raise ApiTypeError(
"Got an unexpected parameter '%s'"
" to method `%s`" % (key, self.settings["operation_id"])
)
# only throw this nullable ApiValueError if _validate_inputs
# is False, if _validate_inputs==True we catch this case
# in self.__validate_inputs
if (
key not in self.params_map["nullable"]
and value is None
and kwargs["_validate_inputs"] is False
):
raise ApiValueError(
"Value may not be None for non-nullable parameter `%s`"
" when calling `%s`" % (key, self.settings["operation_id"])
)
for key in self.params_map["required"]:
if key not in kwargs.keys():
raise ApiValueError(
"Missing the required parameter `%s` when calling "
"`%s`" % (key, self.settings["operation_id"])
)
self.__validate_inputs(kwargs)
params = self.__gather_params(kwargs)
accept_headers_list = self.headers_map["accept"]
if accept_headers_list:
params["header"]["Accept"] = self.api_client.select_header_accept(accept_headers_list)
if kwargs.get("_content_type"):
params["header"]["Content-Type"] = kwargs["_content_type"]
else:
content_type_headers_list = self.headers_map["content_type"]
if content_type_headers_list:
if params["body"] != "":
content_types_list = self.api_client.select_header_content_type(
content_type_headers_list, self.settings["http_method"], params["body"]
)
if content_types_list:
params["header"]["Content-Type"] = content_types_list
return self.api_client.call_api(
self.settings["endpoint_path"],
self.settings["http_method"],
params["path"],
params["query"],
params["header"],
body=params["body"],
post_params=params["form"],
files=params["file"],
response_schema=self.settings["response_schema"],
auth_settings=self.settings["auth"],
_async_call=kwargs["_async_call"],
_check_type=kwargs["_validate_outputs"],
_check_status=kwargs["_check_status"],
_parse_response=kwargs["_parse_response"],
_request_timeout=kwargs["_request_timeout"],
_host=_host,
_request_auths=kwargs["_request_auths"],
collection_formats=params["collection_format"],
)