From dfa71badb593878424b910c986b1959725d7e032 Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Tue, 14 Jun 2022 10:40:11 +0300 Subject: [PATCH] Add progressbar to cli (#46) --- CHANGELOG.md | 1 + Dockerfile.ci | 6 +- utils/cli/cli.py | 6 +- utils/cli/core/core.py | 272 +++++++++++++++--- utils/cli/core/utils.py | 36 +++ .../base.txt} | 2 + utils/cli/requirements/testing.txt | 4 + utils/cli/tests/test_cli.py | 60 +++- 8 files changed, 332 insertions(+), 55 deletions(-) create mode 100644 utils/cli/core/utils.py rename utils/cli/{requirements.txt => requirements/base.txt} (54%) create mode 100644 utils/cli/requirements/testing.txt diff --git a/CHANGELOG.md b/CHANGELOG.md index 62ba1292..5241d2f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Support of attributes returned by serverless functions () based on () - Project/task backups uploading via chunk uploads () - Fixed UX bug when jobs pagination is reset after changing a job () +- Progressbars in CLI for file uploading and downloading () ### Changed - Bumped nuclio version to 1.8.14 () diff --git a/Dockerfile.ci b/Dockerfile.ci index 4fc6512b..ca594138 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -21,9 +21,11 @@ RUN apt-get update && \ && \ rm -rf /var/lib/apt/lists/*; -COPY cvat/requirements/ /tmp/requirements/ +COPY cvat/requirements/ /tmp/cvat/requirements/ +COPY utils/cli/requirements/ /tmp/utils/cli/requirements/ -RUN DATUMARO_HEADLESS=1 python3 -m pip install --no-cache-dir -r /tmp/requirements/${DJANGO_CONFIGURATION}.txt && \ +RUN DATUMARO_HEADLESS=1 python3 -m pip install --no-cache-dir -r /tmp/cvat/requirements/${DJANGO_CONFIGURATION}.txt && \ + python3 -m pip install --no-cache-dir -r /tmp/utils/cli/requirements/testing.txt && \ python3 -m pip install --no-cache-dir coveralls RUN gem install coveralls-lcov diff --git a/utils/cli/cli.py b/utils/cli/cli.py index 9b4183b9..dd02d076 100755 --- a/utils/cli/cli.py +++ b/utils/cli/cli.py @@ -12,7 +12,11 @@ log = logging.getLogger(__name__) def config_log(level): log = logging.getLogger('core') - log.addHandler(logging.StreamHandler(sys.stdout)) + formatter = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', style='%') + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(formatter) + log.addHandler(handler) log.setLevel(level) if level <= logging.DEBUG: HTTPConnection.debuglevel = 1 diff --git a/utils/cli/core/core.py b/utils/cli/core/core.py index c2c0d3bd..8e12bfb2 100644 --- a/utils/cli/core/core.py +++ b/utils/cli/core/core.py @@ -2,23 +2,33 @@ # # SPDX-License-Identifier: MIT +from __future__ import annotations +from contextlib import closing +from typing import Optional, Tuple + +import tqdm import json import logging import os +import os.path as osp import requests from io import BytesIO import mimetypes from time import sleep from PIL import Image +from tusclient import client +from tusclient import uploader +from tusclient.request import TusRequest, TusUploadFailed + +from utils.cli.core.utils import StreamWithProgress from .definition import ResourceType log = logging.getLogger(__name__) -class CLI(): - - def __init__(self, session, api, credentials): +class CLI: + def __init__(self, session: requests.Session, api: CVAT_API_V2, credentials: Tuple[str, str]): self.api = api self.session = session self.login(credentials) @@ -77,9 +87,13 @@ class CLI(): completion_verification_period=20, git_completion_verification_period=2, dataset_repository_url='', - lfs=False, **kwargs): - """ Create a new task with the given name and labels JSON and - add the files to it. """ + lfs=False, **kwargs) -> int: + """ + Create a new task with the given name and labels JSON and + add the files to it. + + Returns: id of the created task + """ url = self.api.tasks labels = [] if kwargs.get('project_id') is not None else labels data = {'name': name, @@ -94,7 +108,9 @@ class CLI(): response.raise_for_status() response_json = response.json() log.info('Created task ID: {id} NAME: {name}'.format(**response_json)) + task_id = response_json['id'] + assert isinstance(task_id, int) self.tasks_data(task_id, resource_type, resources, **kwargs) if annotation_path != '': @@ -114,6 +130,7 @@ class CLI(): log.info(logger_string) self.tasks_upload(task_id, annotation_format, annotation_path, **kwargs) + if dataset_repository_url: response = self.session.post( self.api.git_create(task_id), @@ -140,6 +157,8 @@ class CLI(): log.info(f"Dataset repository creation completed with status: {response_json['status']}.") + return task_id + def tasks_delete(self, task_ids, **kwargs): """ Delete a list of tasks, ignoring those which don't exist. """ for task_id in task_ids: @@ -173,9 +192,12 @@ class CLI(): outfile = 'task_{}_frame_{:06d}{}'.format(task_id, frame_id, im_ext) im.save(os.path.join(outdir, outfile)) - def tasks_dump(self, task_id, fileformat, filename, **kwargs): - """ Download annotations for a task in the specified format - (e.g. 'YOLO ZIP 1.0').""" + def tasks_dump(self, task_id, fileformat, filename, *, + pbar=None, completion_check_period=2, **kwargs) -> None: + """ + Download annotations for a task in the specified format (e.g. 'YOLO ZIP 1.0'). + """ + url = self.api.tasks_id(task_id) response = self.session.get(url) response.raise_for_status() @@ -184,70 +206,178 @@ class CLI(): url = self.api.tasks_id_annotations_filename(task_id, response_json['name'], fileformat) + + log.info("Waiting for the server to prepare the file...") + while True: response = self.session.get(url) response.raise_for_status() log.info('STATUS {}'.format(response.status_code)) if response.status_code == 201: break + sleep(completion_check_period) + + if pbar is None: + pbar = self._make_pbar("Downloading") + self._download_file(url + '&action=download', output_path=filename, pbar=pbar) + + log.info(f"Annotations have been exported to {filename}") + + def _make_tus_uploader(cli, url, **kwargs): + # Adjusts the library code for CVAT server + # allows to reuse session + class MyTusUploader(client.Uploader): + def __init__(self, *_args, session: requests.Session = None, **_kwargs): + self._session = session + super().__init__(*_args, **_kwargs) + + def _do_request(self): + self.request = TusRequest(self) + self.request.handle = self._session + try: + self.request.perform() + self.verify_upload() + except TusUploadFailed as error: + self._retry_or_cry(error) + + @uploader._catch_requests_error + def create_url(self): + """ + Return upload url. + + Makes request to tus server to create a new upload url for the required file upload. + """ + headers = self.headers + headers['upload-length'] = str(self.file_size) + headers['upload-metadata'] = ','.join(self.encode_metadata()) + headers['origin'] = cli.api.host # required by CVAT server + resp = self._session.post(self.client.url, headers=headers) + url = resp.headers.get("location") + if url is None: + msg = 'Attempt to retrieve create file url with status {}'.format(resp.status_code) + raise uploader.TusCommunicationError(msg, resp.status_code, resp.content) + return uploader.urljoin(self.client.url, url) + + @uploader._catch_requests_error + def get_offset(self): + """ + Return offset from tus server. + + This is different from the instance attribute 'offset' because this makes an + http request to the tus server to retrieve the offset. + """ + resp = self._session.head(self.url, headers=self.headers) + offset = resp.headers.get('upload-offset') + if offset is None: + msg = 'Attempt to retrieve offset fails with status {}'.format(resp.status_code) + raise uploader.TusCommunicationError(msg, resp.status_code, resp.content) + return int(offset) + + tus_client = client.TusClient(url, headers=cli.session.headers) + return MyTusUploader(client=tus_client, session=cli.session, **kwargs) + + def _upload_file_data_with_tus(self, url, filename, *, params=None, pbar=None, logger=None): + CHUNK_SIZE = 2**20 + + file_size = os.stat(filename).st_size + + with open(filename, 'rb') as input_file: + if pbar is not None: + input_file = StreamWithProgress(input_file, pbar, length=file_size) - response = self.session.get(url + '&action=download') + tus_uploader = self._make_tus_uploader(url + '/', metadata=params, + file_stream=input_file, chunk_size=CHUNK_SIZE, log_func=logger) + tus_uploader.upload() + + def _upload_file_with_tus(self, url, filename, *, params=None, pbar=None, logger=None): + # "CVAT-TUS" protocol has 2 extra messages + response = self.session.post(url, headers={'Upload-Start': ''}, params=params) response.raise_for_status() + if response.status_code != 202: + raise Exception("Failed to upload file: " + f"unexpected status code received ({response.status_code})") + + self._upload_file_data_with_tus(url=url, filename=filename, + params=params, pbar=pbar, logger=logger) - with open(filename, 'wb') as fp: - fp.write(response.content) + response = self.session.post(url, headers={'Upload-Finish': ''}, params=params) + response.raise_for_status() + if response.status_code != 202: + raise Exception("Failed to upload file: " + f"unexpected status code received ({response.status_code})") - def tasks_upload(self, task_id, fileformat, filename, **kwargs): + def tasks_upload(self, task_id, fileformat, filename, *, + completion_check_period=2, pbar=None, **kwargs): """ Upload annotations for a task in the specified format (e.g. 'YOLO ZIP 1.0').""" - url = self.api.tasks_id_annotations_format(task_id, fileformat) + url = self.api.tasks_id_annotations(task_id) + params = { + 'format': fileformat, + 'filename': os.path.basename(filename) + } + + if pbar is None: + pbar = self._make_pbar("Uploading...") + + self._upload_file_with_tus(url, filename, params=params, pbar=pbar, logger=log.debug) + while True: - response = self.session.put( - url, - files={'annotation_file': open(filename, 'rb')} - ) + response = self.session.put(url, params=params) response.raise_for_status() if response.status_code == 201: break - logger_string = "Upload job for Task ID {} ".format(task_id) +\ - "with annotation file {} finished".format(filename) - log.info(logger_string) + sleep(completion_check_period) - def tasks_export(self, task_id, filename, export_verification_period=3, **kwargs): - """ Export and download a whole task """ - export_url = self.api.tasks_id(task_id) + '/backup' + log.info(f"Upload job for Task ID {task_id} " + f"with annotation file {filename} finished") + def tasks_export(self, task_id, filename, *, + completion_check_period=2, pbar=None, **kwargs): + """ Download a task backup """ + log.info("Waiting for the server to prepare the file...") + + url = self.api.tasks_id_backup(task_id) while True: - response = self.session.get(export_url) + response = self.session.get(url) response.raise_for_status() log.info('STATUS {}'.format(response.status_code)) if response.status_code == 201: break - sleep(export_verification_period) + sleep(completion_check_period) - response = self.session.get(export_url + '?action=download') - response.raise_for_status() + if pbar is None: + pbar = self._make_pbar("Downloading") + self._download_file(url + '?action=download', output_path=filename, pbar=pbar) - with open(filename, 'wb') as fp: - fp.write(response.content) - logger_string = "Task {} has been exported sucessfully. ".format(task_id) +\ - "to {}".format(os.path.abspath(filename)) - log.info(logger_string) + log.info(f"Task {task_id} has been exported sucessfully " + f"to {os.path.abspath(filename)}") + + def tasks_import(self, filename, *, + completion_check_period=2, pbar=None, **kwargs) -> None: + """ Import a task from a backup file""" + + url = self.api.tasks_backup() + + if pbar is None: + pbar = self._make_pbar("Uploading...") + + file_size = os.stat(filename).st_size - def tasks_import(self, filename, import_verification_period=3, **kwargs): - """ Import a task""" - url = self.api.tasks + '/backup' with open(filename, 'rb') as input_file: + input_stream = StreamWithProgress(input_file, pbar=pbar, length=file_size) + response = self.session.post( url, - files={'task_file': input_file} + files={'task_file': input_stream} ) response.raise_for_status() response_json = response.json() rq_id = response_json['rq_id'] + + # check task status while True: - sleep(import_verification_period) + sleep(completion_check_period) response = self.session.post( url, data={'rq_id': rq_id} @@ -268,8 +398,56 @@ class CLI(): if 'csrftoken' in response.cookies: self.session.headers['X-CSRFToken'] = response.cookies['csrftoken'] + def _make_pbar(self, title: str = None) -> tqdm.tqdm: + return tqdm.tqdm(unit_scale=True, unit='B', unit_divisor=1024, desc=title) -class CVAT_API_V2(): + def _download_file(self, url: str, output_path: str, *, + timeout: int = 60, pbar: Optional[tqdm.tqdm] = None) -> None: + """ + Downloads the file from url into a temporary file, then renames it + to the requested name. + """ + + CHUNK_SIZE = 2**20 + + assert not osp.exists(output_path) + + tmp_path = output_path + ".tmp" + if osp.exists(tmp_path): + raise FileExistsError(f"Can't write temporary file '{tmp_path}' - file exists") + + response = self.session.get(url, timeout=timeout, stream=True) + + with closing(response): + response.raise_for_status() + + try: + file_size = int(response.headers.get('Content-Length', 0)) + except (ValueError, KeyError): + file_size = None + + try: + with open(tmp_path, "wb") as fd: + if pbar is not None: + pbar.reset(file_size) + + try: + for chunk in response.iter_content(chunk_size=CHUNK_SIZE): + if pbar is not None: + pbar.update(n=len(chunk)) + + fd.write(chunk) + finally: + if pbar is not None: + pbar.close() + + os.rename(tmp_path, output_path) + except: + os.unlink(tmp_path) + raise + + +class CVAT_API_V2: """ Build parameterized API URLs """ def __init__(self, host, https=False): @@ -279,7 +457,8 @@ class CVAT_API_V2(): host = host.replace('http://', '') host = host.replace('https://', '') scheme = 'https' if https else 'http' - self.base = '{}://{}/api/'.format(scheme, host) + self.host = '{}://{}'.format(scheme, host) + self.base = self.host + '/api/' self.git = f'{scheme}://{host}/git/repository/' @property @@ -289,6 +468,9 @@ class CVAT_API_V2(): def tasks_page(self, page_id): return self.tasks + '?page={}'.format(page_id) + def tasks_backup(self): + return self.tasks + '/backup' + def tasks_id(self, task_id): return self.tasks + '/{}'.format(task_id) @@ -301,12 +483,18 @@ class CVAT_API_V2(): def tasks_id_status(self, task_id): return self.tasks_id(task_id) + '/status' + def tasks_id_backup(self, task_id): + return self.tasks_id(task_id) + '/backup' + + def tasks_id_annotations(self, task_id): + return self.tasks_id(task_id) + '/annotations' + def tasks_id_annotations_format(self, task_id, fileformat): - return self.tasks_id(task_id) + '/annotations?format={}' \ + return self.tasks_id_annotations(task_id) + '?format={}' \ .format(fileformat) def tasks_id_annotations_filename(self, task_id, name, fileformat): - return self.tasks_id(task_id) + '/annotations?format={}&filename={}' \ + return self.tasks_id_annotations(task_id) + '?format={}&filename={}' \ .format(fileformat, name) def git_create(self, task_id): diff --git a/utils/cli/core/utils.py b/utils/cli/core/utils.py new file mode 100644 index 00000000..fbea0a70 --- /dev/null +++ b/utils/cli/core/utils.py @@ -0,0 +1,36 @@ +# Copyright (C) 2022 Intel Corporation +# +# SPDX-License-Identifier: MIT + +from __future__ import annotations + +import io +import tqdm + + +class StreamWithProgress: + def __init__(self, stream: io.RawIOBase, pbar: tqdm.tqdm, length=None): + self.stream = stream + self.pbar = pbar + + if hasattr(stream, '__len__'): + length = len(stream) + + self.length = length + pbar.reset(length) + + def read(self, size=-1): + chunk = self.stream.read(size) + if chunk is not None: + self.pbar.update(n=len(chunk)) + return chunk + + def __len__(self): + return self.length + + def seek(self, pos, start=0): + self.stream.seek(pos, start) + self.pbar.n = pos + + def tell(self): + return self.stream.tell() diff --git a/utils/cli/requirements.txt b/utils/cli/requirements/base.txt similarity index 54% rename from utils/cli/requirements.txt rename to utils/cli/requirements/base.txt index 14cc33a6..945b4ee7 100644 --- a/utils/cli/requirements.txt +++ b/utils/cli/requirements/base.txt @@ -1,2 +1,4 @@ Pillow>=6.2.0 requests>=2.20.1 +tuspy==0.2.5 +tqdm>=4.64.0 diff --git a/utils/cli/requirements/testing.txt b/utils/cli/requirements/testing.txt new file mode 100644 index 00000000..01b4c5f5 --- /dev/null +++ b/utils/cli/requirements/testing.txt @@ -0,0 +1,4 @@ +-r base.txt + +# We depend on the server in tests +-r ../../../cvat/requirements/testing.txt diff --git a/utils/cli/tests/test_cli.py b/utils/cli/tests/test_cli.py index 2d008b37..52f1be62 100644 --- a/utils/cli/tests/test_cli.py +++ b/utils/cli/tests/test_cli.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: MIT +from contextlib import closing import io import logging import os @@ -11,11 +12,13 @@ import unittest from django.conf import settings from PIL import Image from rest_framework.test import APITestCase, RequestsClient +from datumaro.util.scope import scoped, on_exit_do import cvat.apps.engine.log as log from cvat.apps.engine.tests.test_rest_api import (create_db_users, generate_image_file) from utils.cli.core import CLI, CVAT_API_V2, ResourceType +from tqdm import tqdm class TestCLI(APITestCase): @@ -26,7 +29,7 @@ class TestCLI(APITestCase): self.api = CVAT_API_V2('testserver') self.cli = CLI(self.client, self.api, self.credentials) self.taskname = 'test_task' - self.cli.tasks_create(self.taskname, + self.task_id = self.cli.tasks_create(self.taskname, [{'name' : 'car'}, {'name': 'person'}], ResourceType.LOCAL, [self.img_file]) @@ -66,24 +69,53 @@ class TestCLI(APITestCase): self.cli.tasks_list(False) self.assertRegex(self.mock_stdout.getvalue(), '.*Task ID {} deleted.*'.format(1)) + @scoped def test_tasks_dump(self): - path = os.path.join(settings.SHARE_ROOT, 'test_cli.xml') - self.cli.tasks_dump(1, 'CVAT for images 1.1', path) + path = os.path.join(settings.SHARE_ROOT, 'test_cli.zip') + on_exit_do(os.remove, path) + + with closing(io.StringIO()) as pbar_out: + pbar = tqdm(file=pbar_out, mininterval=0) + self.cli.tasks_dump(self.task_id, 'CVAT for images 1.1', path, pbar=pbar) + + pbar_out = pbar_out.getvalue().strip('\r').split('\r') + + self.assertTrue(os.path.exists(path)) + self.assertRegex(pbar_out[-1], '100%') + + @scoped + def test_tasks_export(self): + path = os.path.join(settings.SHARE_ROOT, 'test_cli.zip') + on_exit_do(os.remove, path) + + with closing(io.StringIO()) as pbar_out: + pbar = tqdm(file=pbar_out, mininterval=0) + self.cli.tasks_export(self.task_id, path, pbar=pbar) + + pbar_out = pbar_out.getvalue().strip('\r').split('\r') + self.assertTrue(os.path.exists(path)) - os.remove(path) + self.assertRegex(pbar_out[-1], '100%') + @scoped def test_tasks_frame_original(self): path = os.path.join(settings.SHARE_ROOT, 'task_1_frame_000000.jpg') - self.cli.tasks_frame(1, [0], outdir=settings.SHARE_ROOT, quality='original') + on_exit_do(os.remove, path) + + self.cli.tasks_frame(self.task_id, [0], + outdir=settings.SHARE_ROOT, quality='original') self.assertTrue(os.path.exists(path)) - os.remove(path) + @scoped def test_tasks_frame(self): path = os.path.join(settings.SHARE_ROOT, 'task_1_frame_000000.jpg') - self.cli.tasks_frame(1, [0], outdir=settings.SHARE_ROOT, quality='compressed') + on_exit_do(os.remove, path) + + self.cli.tasks_frame(self.task_id, [0], + outdir=settings.SHARE_ROOT, quality='compressed') self.assertTrue(os.path.exists(path)) - os.remove(path) + @scoped def test_tasks_upload(self): test_image = Image.open(self.img_file) width, height = test_image.size @@ -136,9 +168,17 @@ class TestCLI(APITestCase): ] }""" content = generate_coco_anno() % (height, width) + path = os.path.join(settings.SHARE_ROOT, 'test_cli.json') with open(path, "wb") as coco: coco.write(content) - self.cli.tasks_upload(1, 'COCO 1.0', path) + on_exit_do(os.remove, path) + + with closing(io.StringIO()) as pbar_out: + pbar = tqdm(file=pbar_out, mininterval=0) + self.cli.tasks_upload(self.task_id, 'COCO 1.0', path, pbar=pbar) + + pbar_out = pbar_out.getvalue().strip('\r').split('\r') + self.assertRegex(self.mock_stdout.getvalue(), '.*{}.*'.format("annotation file")) - os.remove(path) + self.assertRegex(pbar_out[-1], '100%')