diff --git a/CHANGELOG.md b/CHANGELOG.md index c63edcfe..c00aa3f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,8 @@ non-ascii paths while adding files from "Connected file share" (issue #4428) - Create manifest with cvat/server docker container command () - Cannot assign a resource to a user who has an organization () - Oriented bounding boxes broken with COCO format ss() +- Fixed upload resumption in production environments + () ### Security - TDB diff --git a/cvat-sdk/cvat_sdk/core/uploading.py b/cvat-sdk/cvat_sdk/core/uploading.py index d51fa76d..9747060a 100644 --- a/cvat-sdk/cvat_sdk/core/uploading.py +++ b/cvat-sdk/cvat_sdk/core/uploading.py @@ -13,6 +13,7 @@ import requests import urllib3 from cvat_sdk.api_client.api_client import ApiClient, Endpoint +from cvat_sdk.api_client.exceptions import ApiException from cvat_sdk.api_client.rest import RESTClientObject from cvat_sdk.core.helpers import StreamWithProgress, expect_status from cvat_sdk.core.progress import NullProgressReporter, ProgressReporter @@ -20,14 +21,123 @@ from cvat_sdk.core.progress import NullProgressReporter, ProgressReporter if TYPE_CHECKING: from cvat_sdk.core.client import Client +import tusclient.uploader as tus_uploader +from tusclient.client import TusClient as _TusClient +from tusclient.client import Uploader as _TusUploader +from tusclient.request import TusRequest as _TusRequest +from tusclient.request import TusUploadFailed as _TusUploadFailed + MAX_REQUEST_SIZE = 100 * 2**20 +class _RestClientAdapter: + # Provides requests.Session-like interface for REST client + # only patch is called in the tus client + + def __init__(self, rest_client: RESTClientObject): + self.rest_client = rest_client + + def _request(self, method, url, data=None, json=None, **kwargs): + raw = self.rest_client.request( + method=method, + url=url, + headers=kwargs.get("headers"), + query_params=kwargs.get("params"), + post_params=json, + body=data, + _parse_response=False, + _request_timeout=kwargs.get("timeout"), + _check_status=False, + ) + + result = requests.Response() + result._content = raw.data + result.raw = raw + result.headers.update(raw.headers) + result.status_code = raw.status + result.reason = raw.msg + return result + + def patch(self, *args, **kwargs): + return self._request("PATCH", *args, **kwargs) + + +class _MyTusUploader(_TusUploader): + # Adjusts the library code for CVAT server + # Allows to reuse session + + def __init__(self, *_args, api_client: ApiClient, **_kwargs): + self._api_client = api_client + super().__init__(*_args, **_kwargs) + + def _do_request(self): + self.request = _TusRequest(self) + self.request.handle = _RestClientAdapter(self._api_client.rest_client) + try: + self.request.perform() + self.verify_upload() + except _TusUploadFailed as error: + self._retry_or_cry(error) + + @tus_uploader._catch_requests_error + def create_url(self): + """ + Return upload url. + + Makes request to tus server to create a new upload url for the required file upload. + """ + headers = self.headers + headers["upload-length"] = str(self.file_size) + headers["upload-metadata"] = ",".join(self.encode_metadata()) + resp = self._api_client.rest_client.POST(self.client.url, headers=headers) + url = resp.headers.get("location") + if url is None: + msg = "Attempt to retrieve create file url with status {}".format(resp.status_code) + raise tus_uploader.TusCommunicationError(msg, resp.status_code, resp.content) + return tus_uploader.urljoin(self.client.url, url) + + @tus_uploader._catch_requests_error + def get_offset(self): + """ + Return offset from tus server. + + This is different from the instance attribute 'offset' because this makes an + http request to the tus server to retrieve the offset. + """ + try: + resp = self._api_client.rest_client.HEAD(self.url, headers=self.headers) + except ApiException as ex: + if ex.status == 405: # Method Not Allowed + # In CVAT up to version 2.2.0, HEAD requests were internally + # converted to GET by mod_wsgi, and subsequently rejected by the server. + # For compatibility with old servers, we'll handle such rejections by + # restarting the upload from the beginning. + return 0 + + raise tus_uploader.TusCommunicationError( + f"Attempt to retrieve offset failed with status {ex.status}", + ex.status, + ex.body, + ) from ex + + offset = resp.headers.get("upload-offset") + if offset is None: + raise tus_uploader.TusCommunicationError( + f"Attempt to retrieve offset failed with status {resp.status}", + resp.status, + resp.data, + ) + + return int(offset) + + class Uploader: """ Implements common uploading protocols """ + _CHUNK_SIZE = 10 * 2**20 + def __init__(self, client: Client): self._client = client @@ -132,110 +242,16 @@ class Uploader: @staticmethod def _make_tus_uploader(api_client: ApiClient, url: str, **kwargs): - import tusclient.uploader as tus_uploader - from tusclient.client import TusClient, Uploader - from tusclient.request import TusRequest, TusUploadFailed - - class RestClientAdapter: - # Provides requests.Session-like interface for REST client - # only patch is called in the tus client - - def __init__(self, rest_client: RESTClientObject): - self.rest_client = rest_client - - def _request(self, method, url, data=None, json=None, **kwargs): - raw = self.rest_client.request( - method=method, - url=url, - headers=kwargs.get("headers"), - query_params=kwargs.get("params"), - post_params=json, - body=data, - _parse_response=False, - _request_timeout=kwargs.get("timeout"), - _check_status=False, - ) - - result = requests.Response() - result._content = raw.data - result.raw = raw - result.headers.update(raw.headers) - result.status_code = raw.status - result.reason = raw.msg - return result - - def patch(self, *args, **kwargs): - return self._request("PATCH", *args, **kwargs) - - class MyTusUploader(Uploader): - # Adjusts the library code for CVAT server - # Allows to reuse session - - def __init__(self, *_args, api_client: ApiClient, **_kwargs): - self._api_client = api_client - super().__init__(*_args, **_kwargs) - - def _do_request(self): - self.request = TusRequest(self) - self.request.handle = RestClientAdapter(self._api_client.rest_client) - try: - self.request.perform() - self.verify_upload() - except TusUploadFailed as error: - self._retry_or_cry(error) - - @tus_uploader._catch_requests_error - def create_url(self): - """ - Return upload url. - - Makes request to tus server to create a new upload url for the required file upload. - """ - headers = self.headers - headers["upload-length"] = str(self.file_size) - headers["upload-metadata"] = ",".join(self.encode_metadata()) - resp = self._api_client.rest_client.POST(self.client.url, headers=headers) - url = resp.headers.get("location") - if url is None: - msg = "Attempt to retrieve create file url with status {}".format( - resp.status_code - ) - raise tus_uploader.TusCommunicationError(msg, resp.status_code, resp.content) - return tus_uploader.urljoin(self.client.url, url) - - @tus_uploader._catch_requests_error - def get_offset(self): - """ - Return offset from tus server. - - This is different from the instance attribute 'offset' because this makes an - http request to the tus server to retrieve the offset. - """ - # FIXME: traefik changes HEAD to GET for some reason, and it breaks the protocol - - # Assume we are starting from scratch. This effectively disallows us to resume - # old file uploading - return 0 - - # resp = self._api_client.rest_client.HEAD(self.url, headers=self.headers) - # offset = resp.headers.get("upload-offset") - # if offset is None: - # msg = "Attempt to retrieve offset fails with status {}".format(resp.status_code) - # raise tus_uploader.TusCommunicationError(msg, resp.status_code, resp.content) - # return int(offset) - # Add headers required by CVAT server headers = {} headers["Origin"] = api_client.configuration.host headers.update(api_client.get_common_headers()) - client = TusClient(url, headers=headers) + client = _TusClient(url, headers=headers) - return MyTusUploader(client=client, api_client=api_client, **kwargs) + return _MyTusUploader(client=client, api_client=api_client, **kwargs) def _upload_file_data_with_tus(self, url, filename, *, meta=None, pbar=None, logger=None): - CHUNK_SIZE = 10 * 2**20 - file_size = os.stat(filename).st_size if pbar is None: pbar = NullProgressReporter() @@ -248,7 +264,7 @@ class Uploader: url=url.rstrip("/") + "/", metadata=meta, file_stream=input_file_with_progress, - chunk_size=CHUNK_SIZE, + chunk_size=Uploader._CHUNK_SIZE, log_func=logger, ) tus_uploader.upload() diff --git a/mod_wsgi.conf b/mod_wsgi.conf index 561a4b3d..09e615f8 100644 --- a/mod_wsgi.conf +++ b/mod_wsgi.conf @@ -2,3 +2,15 @@ LoadModule xsendfile_module /usr/lib/apache2/modules/mod_xsendfile.so XSendFile On XSendFilePath ${HOME}/data/ XSendFilePath ${HOME}/static/ + +# The presence of an Apache output filter (mod_xsendfile) causes mod_wsgi +# to internally convert HEAD requests to GET before passing them to the +# application, for reasons explained here: +# . +# However, we need HEAD requests passed through as-is, because the TUS +# protocol requires them. It should be safe to disable this functionality in +# our case, because mod_xsendfile does not examine the response body (it +# either passes it through or discards it entirely based on the headers), +# so it shouldn't matter whether the application omits the body in response +# to a HEAD request. +WSGIMapHEADToGET Off diff --git a/tests/python/sdk/test_tasks.py b/tests/python/sdk/test_tasks.py index 32c46a0a..e41ffde7 100644 --- a/tests/python/sdk/test_tasks.py +++ b/tests/python/sdk/test_tasks.py @@ -12,6 +12,7 @@ import pytest from cvat_sdk import Client, models from cvat_sdk.api_client import exceptions from cvat_sdk.core.proxies.tasks import ResourceType, Task +from cvat_sdk.core.uploading import Uploader, _MyTusUploader from PIL import Image from shared.utils.helpers import generate_image_files @@ -279,7 +280,7 @@ class TestTaskUsecases: assert "100%" in pbar_out.getvalue().strip("\r").split("\r")[-1] assert self.stdout.getvalue() == "" - def test_can_create_from_backup(self, fxt_new_task: Task, fxt_backup_file: Path): + def _test_can_create_from_backup(self, fxt_new_task: Task, fxt_backup_file: Path): pbar_out = io.StringIO() pbar = make_pbar(file=pbar_out) @@ -292,6 +293,29 @@ class TestTaskUsecases: assert "100%" in pbar_out.getvalue().strip("\r").split("\r")[-1] assert self.stdout.getvalue() == "" + def test_can_create_from_backup(self, fxt_new_task: Task, fxt_backup_file: Path): + self._test_can_create_from_backup(fxt_new_task, fxt_backup_file) + + def test_can_create_from_backup_in_chunks( + self, monkeypatch: pytest.MonkeyPatch, fxt_new_task: Task, fxt_backup_file: Path + ): + monkeypatch.setattr(Uploader, "_CHUNK_SIZE", 100) + + num_requests = 0 + original_do_request = _MyTusUploader._do_request + + def counting_do_request(uploader): + nonlocal num_requests + num_requests += 1 + original_do_request(uploader) + + monkeypatch.setattr(_MyTusUploader, "_do_request", counting_do_request) + + self._test_can_create_from_backup(fxt_new_task, fxt_backup_file) + + # make sure the upload was actually chunked + assert num_requests > 1 + def test_can_get_jobs(self, fxt_new_task: Task): jobs = fxt_new_task.get_jobs()