diff --git a/utils/cli/core/core.py b/utils/cli/core/core.py index 8e12bfb2..c47a2010 100644 --- a/utils/cli/core/core.py +++ b/utils/cli/core/core.py @@ -3,8 +3,8 @@ # SPDX-License-Identifier: MIT from __future__ import annotations -from contextlib import closing -from typing import Optional, Tuple +from contextlib import ExitStack, closing +from typing import Dict, List, Optional, Sequence, Tuple import tqdm import json @@ -21,8 +21,7 @@ from tusclient import client from tusclient import uploader from tusclient.request import TusRequest, TusUploadFailed -from utils.cli.core.utils import StreamWithProgress - +from .utils import StreamWithProgress, expect_status from .definition import ResourceType log = logging.getLogger(__name__) @@ -33,17 +32,47 @@ class CLI: self.session = session self.login(credentials) - def tasks_data(self, task_id, resource_type, resources, **kwargs): + def tasks_data(self, task_id: int, resource_type: ResourceType, + resources: Sequence[str], *, pbar: tqdm.tqdm = None, **kwargs) -> None: """ Add local, remote, or shared files to an existing task. """ url = self.api.tasks_id_data(task_id) data = {} - files = None + if resource_type == ResourceType.LOCAL: - files = {'client_files[{}]'.format(i): open(f, 'rb') for i, f in enumerate(resources)} + bulk_files: Dict[str, int] = {} + separate_files: Dict[str, int] = {} + + MAX_REQUEST_SIZE = 100 * 2**20 + + for filename in resources: + filename = os.path.abspath(filename) + file_size = os.stat(filename).st_size + if MAX_REQUEST_SIZE < file_size: + separate_files[filename] = file_size + else: + bulk_files[filename] = file_size + + total_size = sum(bulk_files.values()) + sum(separate_files.values()) + + # split files by requests + bulk_file_groups = [] + current_group_size = 0 + current_group = [] + for filename, file_size in bulk_files.items(): + if MAX_REQUEST_SIZE < current_group_size + file_size: + bulk_file_groups.append((current_group, current_group_size)) + current_group_size = 0 + current_group = [] + + current_group.append(filename) + current_group_size += file_size + if current_group: + bulk_file_groups.append((current_group, current_group_size)) elif resource_type == ResourceType.REMOTE: data = {'remote_files[{}]'.format(i): f for i, f in enumerate(resources)} elif resource_type == ResourceType.SHARE: data = {'server_files[{}]'.format(i): f for i, f in enumerate(resources)} + data['image_quality'] = 70 ## capture additional kwargs @@ -54,8 +83,38 @@ class CLI: if kwargs.get('frame_step') is not None: data['frame_filter'] = f"step={kwargs.get('frame_step')}" - response = self.session.post(url, data=data, files=files) - response.raise_for_status() + if resource_type in [ResourceType.REMOTE, ResourceType.SHARE]: + response = self.session.post(url, data=data) + response.raise_for_status() + elif resource_type == ResourceType.LOCAL: + if pbar is None: + pbar = self._make_pbar("Uploading files...") + + if pbar is not None: + pbar.reset(total_size) + + self._tus_start_upload(url) + + for group, group_size in bulk_file_groups: + with ExitStack() as es: + group_files = {} + for i, filename in enumerate(group): + group_files[f'client_files[{i}]'] = ( + filename, + es.enter_context(closing(open(filename, 'rb'))) + ) + response = self.session.post(url, data=data, + files=group_files, headers={'Upload-Multiple': ''}) + expect_status(200, response) + + if pbar is not None: + pbar.update(group_size) + + for filename in separate_files: + self._upload_file_with_tus(url, filename, + pbar=pbar, logger=log.debug) + + self._tus_finish_upload(url, data=data) def tasks_list(self, use_json_output, **kwargs): """ List all tasks in either basic or JSON format. """ @@ -82,18 +141,20 @@ class CLI: response.raise_for_status() return output - def tasks_create(self, name, labels, resource_type, resources, - annotation_path='', annotation_format='CVAT XML 1.1', - completion_verification_period=20, - git_completion_verification_period=2, - dataset_repository_url='', - lfs=False, **kwargs) -> int: + def tasks_create(self, name: str, labels: List[Dict[str, str]], + resource_type: ResourceType, resources: Sequence[str], *, + annotation_path='', annotation_format='CVAT XML 1.1', + completion_verification_period=20, + git_completion_verification_period=2, + dataset_repository_url='', + lfs=False, pbar: tqdm.tqdm = None, **kwargs) -> int: """ Create a new task with the given name and labels JSON and add the files to it. Returns: id of the created task """ + url = self.api.tasks labels = [] if kwargs.get('project_id') is not None else labels data = {'name': name, @@ -111,7 +172,7 @@ class CLI: task_id = response_json['id'] assert isinstance(task_id, int) - self.tasks_data(task_id, resource_type, resources, **kwargs) + self.tasks_data(task_id, resource_type, resources, pbar=pbar, **kwargs) if annotation_path != '': url = self.api.tasks_id_status(task_id) @@ -129,7 +190,7 @@ class CLI: response_json['message']) log.info(logger_string) - self.tasks_upload(task_id, annotation_format, annotation_path, **kwargs) + self.tasks_upload(task_id, annotation_format, annotation_path, pbar=pbar, **kwargs) if dataset_repository_url: response = self.session.post( @@ -277,7 +338,7 @@ class CLI: return MyTusUploader(client=tus_client, session=cli.session, **kwargs) def _upload_file_data_with_tus(self, url, filename, *, params=None, pbar=None, logger=None): - CHUNK_SIZE = 2**20 + CHUNK_SIZE = 10 * 2**20 file_size = os.stat(filename).st_size @@ -291,20 +352,20 @@ class CLI: def _upload_file_with_tus(self, url, filename, *, params=None, pbar=None, logger=None): # "CVAT-TUS" protocol has 2 extra messages - response = self.session.post(url, headers={'Upload-Start': ''}, params=params) - response.raise_for_status() - if response.status_code != 202: - raise Exception("Failed to upload file: " - f"unexpected status code received ({response.status_code})") - + self._tus_start_upload(url, params=params) self._upload_file_data_with_tus(url=url, filename=filename, params=params, pbar=pbar, logger=logger) + return self._tus_finish_upload(url, params=params) - response = self.session.post(url, headers={'Upload-Finish': ''}, params=params) - response.raise_for_status() - if response.status_code != 202: - raise Exception("Failed to upload file: " - f"unexpected status code received ({response.status_code})") + def _tus_start_upload(self, url, *, params=None): + response = self.session.post(url, headers={'Upload-Start': ''}, params=params) + expect_status(202, response) + return response + + def _tus_finish_upload(self, url, *, params=None, data=None): + response = self.session.post(url, headers={'Upload-Finish': ''}, params=params, data=data) + expect_status(202, response) + return response def tasks_upload(self, task_id, fileformat, filename, *, completion_check_period=2, pbar=None, **kwargs): @@ -358,37 +419,32 @@ class CLI: """ Import a task from a backup file""" url = self.api.tasks_backup() + params = { + 'filename': os.path.basename(filename) + } if pbar is None: pbar = self._make_pbar("Uploading...") - file_size = os.stat(filename).st_size - - with open(filename, 'rb') as input_file: - input_stream = StreamWithProgress(input_file, pbar=pbar, length=file_size) - - response = self.session.post( - url, - files={'task_file': input_stream} - ) - response.raise_for_status() + response = self._upload_file_with_tus(url, filename, + params=params, pbar=pbar, logger=log.debug) response_json = response.json() rq_id = response_json['rq_id'] # check task status while True: sleep(completion_check_period) + response = self.session.post( url, data={'rq_id': rq_id} ) - response.raise_for_status() if response.status_code == 201: break + expect_status(202, response) task_id = response.json()['id'] - logger_string = "Task has been imported sucessfully. Task ID: {}".format(task_id) - log.info(logger_string) + log.info(f"Task has been imported sucessfully. Task ID: {task_id}") def login(self, credentials): url = self.api.login @@ -408,7 +464,7 @@ class CLI: to the requested name. """ - CHUNK_SIZE = 2**20 + CHUNK_SIZE = 10 * 2**20 assert not osp.exists(output_path) diff --git a/utils/cli/core/utils.py b/utils/cli/core/utils.py index fbea0a70..7c9b3215 100644 --- a/utils/cli/core/utils.py +++ b/utils/cli/core/utils.py @@ -5,6 +5,8 @@ from __future__ import annotations import io + +import requests import tqdm @@ -34,3 +36,9 @@ class StreamWithProgress: def tell(self): return self.stream.tell() + +def expect_status(code: int, response: requests.Response) -> None: + response.raise_for_status() + if response.status_code != code: + raise Exception("Failed to upload file: " + f"unexpected status code received ({response.status_code})") diff --git a/utils/cli/tests/test_cli.py b/utils/cli/tests/test_cli.py index 52f1be62..e014ca19 100644 --- a/utils/cli/tests/test_cli.py +++ b/utils/cli/tests/test_cli.py @@ -8,6 +8,7 @@ import logging import os import sys import unittest +from unittest.mock import MagicMock from django.conf import settings from PIL import Image @@ -22,6 +23,53 @@ from tqdm import tqdm class TestCLI(APITestCase): + @unittest.mock.patch('sys.stdout', new_callable=io.StringIO) + def setUp(self, mock_stdout): + log = logging.getLogger('utils.cli.core') + log.setLevel(logging.INFO) + log.addHandler(logging.StreamHandler(sys.stdout)) + self.mock_stdout = mock_stdout + + self.client = RequestsClient() + self.credentials = ('admin', 'admin') + self.api = CVAT_API_V2('testserver') + self.cli = CLI(self.client, self.api, self.credentials) + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.img_file = os.path.join(settings.SHARE_ROOT, 'test_cli.jpg') + _, data = generate_image_file(cls.img_file) + with open(cls.img_file, 'wb') as image: + image.write(data.read()) + + @classmethod + def tearDownClass(cls): + super().tearDownClass() + os.remove(cls.img_file) + + def tearDown(self): + super().tearDown() + log.close_all() # Release logging resources correctly + + @classmethod + def setUpTestData(cls): + create_db_users(cls) + + def test_tasks_create(self): + with closing(io.StringIO()) as pbar_out: + pbar = tqdm(file=pbar_out, mininterval=0) + + task_id = self.cli.tasks_create('test_task', + [{'name' : 'car'}, {'name': 'person'}], + ResourceType.LOCAL, [self.img_file], pbar=pbar) + + pbar_out = pbar_out.getvalue().strip('\r').split('\r') + + self.assertEqual(1, task_id) + self.assertRegex(pbar_out[-1], '100%') + +class TestTaskOperations(APITestCase): @unittest.mock.patch('sys.stdout', new_callable=io.StringIO) def setUp(self, mock_stdout): self.client = RequestsClient() @@ -32,7 +80,7 @@ class TestCLI(APITestCase): self.task_id = self.cli.tasks_create(self.taskname, [{'name' : 'car'}, {'name': 'person'}], ResourceType.LOCAL, - [self.img_file]) + [self.img_file], pbar=MagicMock()) # redirect logging to mocked stdout to test program output self.mock_stdout = mock_stdout log = logging.getLogger('utils.cli.core') @@ -72,11 +120,12 @@ class TestCLI(APITestCase): @scoped def test_tasks_dump(self): path = os.path.join(settings.SHARE_ROOT, 'test_cli.zip') - on_exit_do(os.remove, path) with closing(io.StringIO()) as pbar_out: pbar = tqdm(file=pbar_out, mininterval=0) + self.cli.tasks_dump(self.task_id, 'CVAT for images 1.1', path, pbar=pbar) + on_exit_do(os.remove, path) pbar_out = pbar_out.getvalue().strip('\r').split('\r') @@ -86,11 +135,12 @@ class TestCLI(APITestCase): @scoped def test_tasks_export(self): path = os.path.join(settings.SHARE_ROOT, 'test_cli.zip') - on_exit_do(os.remove, path) with closing(io.StringIO()) as pbar_out: pbar = tqdm(file=pbar_out, mininterval=0) + self.cli.tasks_export(self.task_id, path, pbar=pbar) + on_exit_do(os.remove, path) pbar_out = pbar_out.getvalue().strip('\r').split('\r') @@ -100,85 +150,119 @@ class TestCLI(APITestCase): @scoped def test_tasks_frame_original(self): path = os.path.join(settings.SHARE_ROOT, 'task_1_frame_000000.jpg') - on_exit_do(os.remove, path) self.cli.tasks_frame(self.task_id, [0], outdir=settings.SHARE_ROOT, quality='original') + on_exit_do(os.remove, path) + self.assertTrue(os.path.exists(path)) @scoped def test_tasks_frame(self): path = os.path.join(settings.SHARE_ROOT, 'task_1_frame_000000.jpg') - on_exit_do(os.remove, path) self.cli.tasks_frame(self.task_id, [0], outdir=settings.SHARE_ROOT, quality='compressed') + on_exit_do(os.remove, path) + self.assertTrue(os.path.exists(path)) @scoped def test_tasks_upload(self): - test_image = Image.open(self.img_file) - width, height = test_image.size - - # Using generate_coco_anno() from: - # https://github.com/opencv/cvat/blob/develop/cvat/apps/engine/tests/test_rest_api.py - def generate_coco_anno(): - return b"""{ - "categories": [ - { - "id": 1, - "name": "car", - "supercategory": "" - }, - { - "id": 2, - "name": "person", - "supercategory": "" - } - ], - "images": [ - { - "coco_url": "", - "date_captured": "", - "flickr_url": "", - "license": 0, - "id": 0, - "file_name": "test_cli.jpg", - "height": %d, - "width": %d - } - ], - "annotations": [ - { - "category_id": 1, - "id": 1, - "image_id": 0, - "iscrowd": 0, - "segmentation": [ - [] - ], - "area": 17702.0, - "bbox": [ - 574.0, - 407.0, - 167.0, - 106.0 - ] - } - ] - }""" - content = generate_coco_anno() % (height, width) - path = os.path.join(settings.SHARE_ROOT, 'test_cli.json') - with open(path, "wb") as coco: - coco.write(content) + self._generate_coco_file(path) on_exit_do(os.remove, path) with closing(io.StringIO()) as pbar_out: pbar = tqdm(file=pbar_out, mininterval=0) + self.cli.tasks_upload(self.task_id, 'COCO 1.0', path, pbar=pbar) pbar_out = pbar_out.getvalue().strip('\r').split('\r') self.assertRegex(self.mock_stdout.getvalue(), '.*{}.*'.format("annotation file")) self.assertRegex(pbar_out[-1], '100%') + + @scoped + def test_tasks_import(self): + anno_path = os.path.join(settings.SHARE_ROOT, 'test_cli.json') + self._generate_coco_file(anno_path) + on_exit_do(os.remove, anno_path) + + backup_path = os.path.join(settings.SHARE_ROOT, 'task_backup.zip') + with closing(io.StringIO()) as pbar_out: + pbar = tqdm(file=pbar_out, mininterval=0) + self.cli.tasks_upload(self.task_id, 'COCO 1.0', anno_path, pbar=pbar) + self.cli.tasks_export(self.task_id, backup_path, pbar=pbar) + on_exit_do(os.remove, backup_path) + + with closing(io.StringIO()) as pbar_out: + pbar = tqdm(file=pbar_out, mininterval=0) + + self.cli.tasks_import(backup_path, pbar=pbar) + + pbar_out = pbar_out.getvalue().strip('\r').split('\r') + + self.assertRegex(self.mock_stdout.getvalue(), '.*{}.*'.format("exported sucessfully")) + self.assertRegex(pbar_out[-1], '100%') + + def _generate_coco_file(self, path): + test_image = Image.open(self.img_file) + image_width, image_height = test_image.size + + content = self._generate_coco_anno(os.path.basename(self.img_file), + image_width=image_width, image_height=image_height) + with open(path, "w") as coco: + coco.write(content) + + @staticmethod + def _generate_coco_anno(image_path, image_width, image_height): + return """{ + "categories": [ + { + "id": 1, + "name": "car", + "supercategory": "" + }, + { + "id": 2, + "name": "person", + "supercategory": "" + } + ], + "images": [ + { + "coco_url": "", + "date_captured": "", + "flickr_url": "", + "license": 0, + "id": 0, + "file_name": "%(image_path)s", + "height": %(image_height)d, + "width": %(image_width)d + } + ], + "annotations": [ + { + "category_id": 1, + "id": 1, + "image_id": 0, + "iscrowd": 0, + "segmentation": [ + [] + ], + "area": 17702.0, + "bbox": [ + 574.0, + 407.0, + 167.0, + 106.0 + ] + } + ] + } + """ % { + 'image_path': image_path, + 'image_height': image_height, + 'image_width': image_width + }