# Copyright (C) 2022 CVAT.ai Corporation # # SPDX-License-Identifier: MIT # CVAT REST API # # REST API for Computer Vision Annotation Tool (CVAT) # noqa: E501 # # The version of the OpenAPI document: alpha (2.0) # Contact: support@cvat.ai # Generated by: https://openapi-generator.tech import io import ipaddress import json import logging import re import ssl from urllib.parse import urlencode, urlparse from urllib.request import proxy_bypass_environment import urllib3 from cvat_sdk.exceptions import ( ApiException, ApiValueError, ForbiddenException, NotFoundException, ServiceException, UnauthorizedException, ) logger = logging.getLogger(__name__) class RESTClientObject(object): def __init__(self, configuration, pools_size=4, maxsize=None): # urllib3.PoolManager will pass all kw parameters to connectionpool # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 # noqa: E501 # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # noqa: E501 # maxsize is the number of requests to host that are allowed in parallel # noqa: E501 # Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501 # cert_reqs if configuration.verify_ssl: cert_reqs = ssl.CERT_REQUIRED else: cert_reqs = ssl.CERT_NONE addition_pool_args = {} if configuration.assert_hostname is not None: addition_pool_args["assert_hostname"] = configuration.assert_hostname # noqa: E501 if configuration.retries is not None: addition_pool_args["retries"] = configuration.retries if configuration.socket_options is not None: addition_pool_args["socket_options"] = configuration.socket_options if maxsize is None: if configuration.connection_pool_maxsize is not None: maxsize = configuration.connection_pool_maxsize else: maxsize = 4 # https pool manager if configuration.proxy and not should_bypass_proxies( configuration.host, no_proxy=configuration.no_proxy or "" ): self.pool_manager = urllib3.ProxyManager( num_pools=pools_size, maxsize=maxsize, cert_reqs=cert_reqs, ca_certs=configuration.ssl_ca_cert, cert_file=configuration.cert_file, key_file=configuration.key_file, proxy_url=configuration.proxy, proxy_headers=configuration.proxy_headers, **addition_pool_args, ) else: self.pool_manager = urllib3.PoolManager( num_pools=pools_size, maxsize=maxsize, cert_reqs=cert_reqs, ca_certs=configuration.ssl_ca_cert, cert_file=configuration.cert_file, key_file=configuration.key_file, **addition_pool_args, ) def request( self, method, url, query_params=None, headers=None, files=None, body=None, post_params=None, *, _parse_response=True, _request_timeout=None, _check_status=True, ) -> urllib3.HTTPResponse: """Perform requests. :param method: http request method :param url: http request url :param query_params: query parameters in the url :param headers: http request headers :param body: request json body, for `application/json` :param post_params: plain parameters for POST/PUT/PATCH requests, when 'Content-Type' is `application/x-www-form-urlencoded` and `multipart/form-data` :param files: file parameters for POST/PUT/PATCH requests :param _parse_response: if False, the urllib3.HTTPResponse object will be returned without reading/decoding response data. Default is True. :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. """ method = method.upper() assert method in ["GET", "HEAD", "DELETE", "POST", "PUT", "PATCH", "OPTIONS"] if post_params and body: raise ApiValueError("body parameter cannot be used with post_params parameter.") post_params = post_params or {} headers = headers or {} timeout = None if _request_timeout: if isinstance(_request_timeout, (int, float)): # noqa: E501,F821 timeout = urllib3.Timeout(total=_request_timeout) elif isinstance(_request_timeout, tuple) and len(_request_timeout) == 2: timeout = urllib3.Timeout(connect=_request_timeout[0], read=_request_timeout[1]) try: # For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE` if method in ["POST", "PUT", "PATCH", "OPTIONS", "DELETE"]: # Only set a default Content-Type for POST, PUT, PATCH and OPTIONS requests if (method != "DELETE") and ("Content-Type" not in headers): headers["Content-Type"] = "application/json" if query_params: url += "?" + urlencode(query_params) if ("Content-Type" not in headers) or ( re.search("json", headers["Content-Type"], re.IGNORECASE) ): request_body = None if body is not None: request_body = json.dumps(body) r = self.pool_manager.request( method, url, body=request_body, preload_content=_parse_response, timeout=timeout, headers=headers, ) elif headers["Content-Type"] == "application/x-www-form-urlencoded": # noqa: E501 if files: raise ApiException( "Files cannot be used when Content-Type " "is 'application/x-www-form-urlencoded'" ) r = self.pool_manager.request( method, url, fields=post_params, encode_multipart=False, preload_content=_parse_response, timeout=timeout, headers=headers, ) elif headers["Content-Type"] == "multipart/form-data": # must del headers['Content-Type'], or the correct # Content-Type which generated by urllib3 will be # overwritten. del headers["Content-Type"] r = self.pool_manager.request( method, url, fields=post_params, encode_multipart=True, preload_content=_parse_response, timeout=timeout, headers=headers, ) # Pass a `string` parameter directly in the body to support # other content types than Json when `body` argument is # provided in serialized form elif isinstance(body, str) or isinstance(body, bytes) or files: request_body = body r = self.pool_manager.request( method, url, body=request_body, preload_content=_parse_response, timeout=timeout, headers=headers, ) else: # Cannot generate the request from given parameters msg = """Cannot prepare a request message for provided arguments. Please check that your arguments match declared content type.""" raise ApiException(status=0, reason=msg) # For `GET`, `HEAD` else: r = self.pool_manager.request( method, url, fields=query_params, preload_content=_parse_response, timeout=timeout, headers=headers, ) except urllib3.exceptions.SSLError as e: msg = "{0}\n{1}".format(type(e).__name__, str(e)) raise ApiException(status=0, reason=msg) if _check_status and not 200 <= r.status <= 299: if r.status == 401: raise UnauthorizedException(http_resp=r) if r.status == 403: raise ForbiddenException(http_resp=r) if r.status == 404: raise NotFoundException(http_resp=r) if 500 <= r.status <= 599: raise ServiceException(http_resp=r) raise ApiException(http_resp=r) return r def GET( self, url, headers=None, query_params=None, _parse_response=True, _request_timeout=None, _check_status=True, ): return self.request( "GET", url, headers=headers, _parse_response=_parse_response, _request_timeout=_request_timeout, _check_status=_check_status, query_params=query_params, ) def HEAD( self, url, headers=None, query_params=None, _parse_response=True, _request_timeout=None, _check_status=True, ): return self.request( "HEAD", url, headers=headers, _parse_response=_parse_response, _request_timeout=_request_timeout, query_params=query_params, _check_status=_check_status, ) def OPTIONS( self, url, headers=None, query_params=None, post_params=None, body=None, _parse_response=True, _request_timeout=None, _check_status=True, ): return self.request( "OPTIONS", url, headers=headers, query_params=query_params, post_params=post_params, _parse_response=_parse_response, _request_timeout=_request_timeout, _check_status=_check_status, body=body, ) def DELETE( self, url, headers=None, query_params=None, body=None, _parse_response=True, _request_timeout=None, _check_status=True, ): return self.request( "DELETE", url, headers=headers, query_params=query_params, _parse_response=_parse_response, _request_timeout=_request_timeout, _check_status=_check_status, body=body, ) def POST( self, url, headers=None, query_params=None, post_params=None, body=None, _parse_response=True, _request_timeout=None, _check_status=True, ): return self.request( "POST", url, headers=headers, query_params=query_params, post_params=post_params, _parse_response=_parse_response, _request_timeout=_request_timeout, _check_status=_check_status, body=body, ) def PUT( self, url, headers=None, query_params=None, post_params=None, body=None, _parse_response=True, _request_timeout=None, _check_status=True, ): return self.request( "PUT", url, headers=headers, query_params=query_params, post_params=post_params, _parse_response=_parse_response, _request_timeout=_request_timeout, _check_status=_check_status, body=body, ) def PATCH( self, url, headers=None, query_params=None, post_params=None, body=None, _parse_response=True, _request_timeout=None, _check_status=True, ): return self.request( "PATCH", url, headers=headers, query_params=query_params, post_params=post_params, _parse_response=_parse_response, _request_timeout=_request_timeout, _check_status=_check_status, body=body, ) # end of class RESTClientObject def is_ipv4(target): """Test if IPv4 address or not""" try: chk = ipaddress.IPv4Address(target) return True except ipaddress.AddressValueError: return False def in_ipv4net(target, net): """Test if target belongs to given IPv4 network""" try: nw = ipaddress.IPv4Network(net) ip = ipaddress.IPv4Address(target) if ip in nw: return True return False except ipaddress.AddressValueError: return False except ipaddress.NetmaskValueError: return False def should_bypass_proxies(url, no_proxy=None): """Yet another requests.should_bypass_proxies Test if proxies should not be used for a particular url. """ parsed = urlparse(url) # special cases if parsed.hostname in [None, ""]: return True # special cases if no_proxy in [None, ""]: return False if no_proxy == "*": return True no_proxy = no_proxy.lower().replace(" ", "") entries = (host for host in no_proxy.split(",") if host) if is_ipv4(parsed.hostname): for item in entries: if in_ipv4net(parsed.hostname, item): return True return proxy_bypass_environment(parsed.hostname, {"no": no_proxy})