Make CLI code nicer (#60)

* Return created task id from function

* Add pbar for annotation dump

* Add testing requirements list

* Remove resources properly in tests

* Add backup dump progress bar

* Refactor code

* Set up logging in CLI

* Add annotations uploading  progress with tus

* Refactor code

* Add tqdm dependency

* Update changelog

* Add some comments to the implementation

* Remove extra code

* Update ci container

* Add progress bars to task import

* Add tests, refactor code

* Add progressbar for task creation

* Remove extra line

* Change exception type

* Move requirements files

* Fix dockerfile

* Revert extra change

* Isolate test directories

* Move cli package

* Update cli package references

* Move package files into a directory

* Move files

* Update requirements and dockerfiles

* Add cvat-cli package

* Autoformat CLI code

* Add developer guide

* Update readme

* Add Black check on CI

* Add isort check on CI

* Merge branch 'develop' into zm/cli-package

* Update package

* Change paths in cli code

* Move files

* Update docs

* Update dockerfile

* Update changelog

* Fix linter issues

* Fix linter issues

* Add dev requirements

* Update ci

Co-authored-by: Nikita Manovich <nikita.manovich@gmail.com>
main
Maxim Zhiltsov 4 years ago committed by GitHub
parent ca8150e21c
commit 26d78fe12e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,48 @@
name: Linter
on: pull_request
jobs:
Black:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- id: files
uses: jitterbit/get-changed-files@v1
continue-on-error: true
- name: Run checks
env:
PR_FILES_AM: ${{ steps.files.outputs.added_modified }}
PR_FILES_RENAMED: ${{ steps.files.outputs.renamed }}
run: |
PR_FILES="$PR_FILES_AM $PR_FILES_RENAMED"
for FILE in $PR_FILES; do
EXTENSION="${FILE##*.}"
DIRECTORY="${FILE%%/*}"
if [[ "$EXTENSION" == "py" && "$DIRECTORY" == "cvat-cli" ]]; then
CHANGED_FILES+=" $FILE"
fi
done
if [[ ! -z $CHANGED_FILES ]]; then
sudo apt-get --no-install-recommends install -y build-essential curl python3-dev python3-pip python3-venv
python3 -m venv .env
. .env/bin/activate
pip install -U pip wheel setuptools
pip install $(egrep "black.*" ./cvat-cli/requirements/development.txt)
mkdir -p black_report
echo "Black version: "$(black --version)
echo "The files will be checked: "$(echo $CHANGED_FILES)
black --check --config ./cvat-cli/pyproject.toml $CHANGED_FILES > ./black_report/black_checks.txt || EXIT_CODE=$(echo $?) || true
deactivate
exit $EXIT_CODE
else
echo "No files with the \"py\" extension found"
fi
- name: Upload artifacts
if: failure()
uses: actions/upload-artifact@v2
with:
name: black_report
path: black_report

@ -0,0 +1,48 @@
name: Linter
on: pull_request
jobs:
isort:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- id: files
uses: jitterbit/get-changed-files@v1
continue-on-error: true
- name: Run checks
env:
PR_FILES_AM: ${{ steps.files.outputs.added_modified }}
PR_FILES_RENAMED: ${{ steps.files.outputs.renamed }}
run: |
PR_FILES="$PR_FILES_AM $PR_FILES_RENAMED"
for FILE in $PR_FILES; do
EXTENSION="${FILE##*.}"
DIRECTORY="${FILE%%/*}"
if [[ "$EXTENSION" == "py" && "$DIRECTORY" == "cvat-cli" ]]; then
CHANGED_FILES+=" $FILE"
fi
done
if [[ ! -z $CHANGED_FILES ]]; then
sudo apt-get --no-install-recommends install -y build-essential curl python3-dev python3-pip python3-venv
python3 -m venv .env
. .env/bin/activate
pip install -U pip wheel setuptools
pip install $(egrep "isort.*" ./cvat-cli/requirements/development.txt)
mkdir -p isort_report
echo "isort version: "$(isort --version)
echo "The files will be checked: "$(echo $CHANGED_FILES)
isort --check --sp ./cvat-cli/pyproject.toml $CHANGED_FILES > ./isort_report/isort_checks.txt || EXIT_CODE=$(echo $?) || true
deactivate
exit $EXIT_CODE
else
echo "No files with the \"py\" extension found"
fi
- name: Upload artifacts
if: failure()
uses: actions/upload-artifact@v2
with:
name: isort_report
path: isort_report

@ -2,3 +2,11 @@
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[tool.isort]
profile = "black"
forced_separate = ["tests"]
line_length = 100
[tool.black]
line-length = 100
target-version = ['py38']

@ -0,0 +1,5 @@
-r base.txt
black>=22.1.0
isort>=5.10.1
pylint>=2.7.0

@ -5,7 +5,7 @@
import os.path as osp
import re
import setuptools
from setuptools import find_packages, setup
def find_version(project_dir=None):
@ -42,7 +42,7 @@ BASE_REQUIREMENTS = parse_requirements(BASE_REQUIREMENTS_FILE)
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
setup(
name="cvat-cli",
version=find_version(project_dir="src/cvat_cli"),
description="Command-line client for CVAT",
@ -50,7 +50,7 @@ setuptools.setup(
long_description_content_type="text/markdown",
url="https://github.com/openvinotoolkit/cvat/",
package_dir={"": "src"},
packages=setuptools.find_packages(where='src'),
packages=find_packages(where="src"),
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",

@ -3,18 +3,22 @@
# SPDX-License-Identifier: MIT
import logging
import requests
import sys
from http.client import HTTPConnection
import requests
from cvat_cli.core.core import CLI, CVAT_API_V2
from cvat_cli.core.definition import parser
log = logging.getLogger(__name__)
def config_log(level):
log = logging.getLogger('core')
formatter = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S', style='%')
log = logging.getLogger("core")
formatter = logging.Formatter(
"[%(asctime)s] %(levelname)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", style="%"
)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
log.addHandler(handler)
@ -25,29 +29,31 @@ def config_log(level):
def main():
actions = {
'create': CLI.tasks_create,
'delete': CLI.tasks_delete,
'ls': CLI.tasks_list,
'frames': CLI.tasks_frame,
'dump': CLI.tasks_dump,
'upload': CLI.tasks_upload,
'export': CLI.tasks_export,
'import': CLI.tasks_import,
"create": CLI.tasks_create,
"delete": CLI.tasks_delete,
"ls": CLI.tasks_list,
"frames": CLI.tasks_frame,
"dump": CLI.tasks_dump,
"upload": CLI.tasks_upload,
"export": CLI.tasks_export,
"import": CLI.tasks_import,
}
args = parser.parse_args()
config_log(args.loglevel)
with requests.Session() as session:
api = CVAT_API_V2('%s:%s' % (args.server_host, args.server_port), args.https)
api = CVAT_API_V2("%s:%s" % (args.server_host, args.server_port), args.https)
cli = CLI(session, api, args.auth)
try:
actions[args.action](cli, **args.__dict__)
except (requests.exceptions.HTTPError,
requests.exceptions.ConnectionError,
requests.exceptions.RequestException) as e:
except (
requests.exceptions.HTTPError,
requests.exceptions.ConnectionError,
requests.exceptions.RequestException,
) as e:
log.critical(e)
return 0
if __name__ == '__main__':
if __name__ == "__main__":
sys.exit(main())

@ -2,5 +2,5 @@
#
# SPDX-License-Identifier: MIT
from .definition import parser, ResourceType # noqa
from .core import CLI, CVAT_API_V2 # noqa
from .definition import ResourceType, parser # noqa

@ -3,26 +3,26 @@
# SPDX-License-Identifier: MIT
from __future__ import annotations
from contextlib import ExitStack, closing
from typing import Dict, List, Optional, Sequence, Tuple
import tqdm
import json
import logging
import mimetypes
import os
import os.path as osp
import requests
from contextlib import ExitStack, closing
from io import BytesIO
import mimetypes
from time import sleep
from typing import Dict, List, Optional, Sequence, Tuple
import requests
import tqdm
from PIL import Image
from tusclient import client
from tusclient import uploader
from tusclient import client, uploader
from tusclient.request import TusRequest, TusUploadFailed
from .utils import StreamWithProgress, expect_status
from .definition import ResourceType
from .utils import StreamWithProgress, expect_status
log = logging.getLogger(__name__)
@ -32,9 +32,16 @@ class CLI:
self.session = session
self.login(credentials)
def tasks_data(self, task_id: int, resource_type: ResourceType,
resources: Sequence[str], *, pbar: tqdm.tqdm = None, **kwargs) -> None:
""" Add local, remote, or shared files to an existing task. """
def tasks_data(
self,
task_id: int,
resource_type: ResourceType,
resources: Sequence[str],
*,
pbar: tqdm.tqdm = None,
**kwargs,
) -> None:
"""Add local, remote, or shared files to an existing task."""
url = self.api.tasks_id_data(task_id)
data = {}
@ -69,19 +76,27 @@ class CLI:
if current_group:
bulk_file_groups.append((current_group, current_group_size))
elif resource_type == ResourceType.REMOTE:
data = {'remote_files[{}]'.format(i): f for i, f in enumerate(resources)}
data = {"remote_files[{}]".format(i): f for i, f in enumerate(resources)}
elif resource_type == ResourceType.SHARE:
data = {'server_files[{}]'.format(i): f for i, f in enumerate(resources)}
data = {"server_files[{}]".format(i): f for i, f in enumerate(resources)}
data['image_quality'] = 70
data["image_quality"] = 70
## capture additional kwargs
for flag in ['chunk_size', 'copy_data', 'image_quality', 'sorting_method',
'start_frame', 'stop_frame', 'use_cache', 'use_zip_chunks']:
for flag in [
"chunk_size",
"copy_data",
"image_quality",
"sorting_method",
"start_frame",
"stop_frame",
"use_cache",
"use_zip_chunks",
]:
if kwargs.get(flag) is not None:
data[flag] = kwargs.get(flag)
if kwargs.get('frame_step') is not None:
data['frame_filter'] = f"step={kwargs.get('frame_step')}"
if kwargs.get("frame_step") is not None:
data["frame_filter"] = f"step={kwargs.get('frame_step')}"
if resource_type in [ResourceType.REMOTE, ResourceType.SHARE]:
response = self.session.post(url, data=data)
@ -99,25 +114,25 @@ class CLI:
with ExitStack() as es:
group_files = {}
for i, filename in enumerate(group):
group_files[f'client_files[{i}]'] = (
group_files[f"client_files[{i}]"] = (
filename,
es.enter_context(closing(open(filename, 'rb')))
es.enter_context(closing(open(filename, "rb"))),
)
response = self.session.post(url, data=data,
files=group_files, headers={'Upload-Multiple': ''})
response = self.session.post(
url, data=data, files=group_files, headers={"Upload-Multiple": ""}
)
expect_status(200, response)
if pbar is not None:
pbar.update(group_size)
for filename in separate_files:
self._upload_file_with_tus(url, filename,
pbar=pbar, logger=log.debug)
self._upload_file_with_tus(url, filename, pbar=pbar, logger=log.debug)
self._tus_finish_upload(url, data=data)
def tasks_list(self, use_json_output, **kwargs):
""" List all tasks in either basic or JSON format. """
"""List all tasks in either basic or JSON format."""
url = self.api.tasks
response = self.session.get(url)
response.raise_for_status()
@ -126,13 +141,13 @@ class CLI:
json_data_list = []
while True:
response_json = response.json()
output += response_json['results']
for r in response_json['results']:
output += response_json["results"]
for r in response_json["results"]:
if use_json_output:
json_data_list.append(r)
else:
log.info('{id},{name},{status}'.format(**r))
if not response_json['next']:
log.info("{id},{name},{status}".format(**r))
if not response_json["next"]:
log.info(json.dumps(json_data_list, indent=4))
return output
page += 1
@ -141,13 +156,22 @@ class CLI:
response.raise_for_status()
return output
def tasks_create(self, name: str, labels: List[Dict[str, str]],
resource_type: ResourceType, resources: Sequence[str], *,
annotation_path='', annotation_format='CVAT XML 1.1',
completion_verification_period=20,
git_completion_verification_period=2,
dataset_repository_url='',
lfs=False, pbar: tqdm.tqdm = None, **kwargs) -> int:
def tasks_create(
self,
name: str,
labels: List[Dict[str, str]],
resource_type: ResourceType,
resources: Sequence[str],
*,
annotation_path="",
annotation_format="CVAT XML 1.1",
completion_verification_period=20,
git_completion_verification_period=2,
dataset_repository_url="",
lfs=False,
pbar: tqdm.tqdm = None,
**kwargs,
) -> int:
"""
Create a new task with the given name and labels JSON and
add the files to it.
@ -156,105 +180,109 @@ class CLI:
"""
url = self.api.tasks
labels = [] if kwargs.get('project_id') is not None else labels
data = {'name': name,
'labels': labels
}
labels = [] if kwargs.get("project_id") is not None else labels
data = {"name": name, "labels": labels}
for flag in ['bug_tracker', 'overlap', 'project_id', 'segment_size']:
for flag in ["bug_tracker", "overlap", "project_id", "segment_size"]:
if kwargs.get(flag) is not None:
data[flag] = kwargs.get(flag)
response = self.session.post(url, json=data)
response.raise_for_status()
response_json = response.json()
log.info('Created task ID: {id} NAME: {name}'.format(**response_json))
log.info("Created task ID: {id} NAME: {name}".format(**response_json))
task_id = response_json['id']
task_id = response_json["id"]
assert isinstance(task_id, int)
self.tasks_data(task_id, resource_type, resources, pbar=pbar, **kwargs)
if annotation_path != '':
if annotation_path != "":
url = self.api.tasks_id_status(task_id)
response = self.session.get(url)
response_json = response.json()
log.info('Awaiting data compression before uploading annotations...')
while response_json['state'] != 'Finished':
log.info("Awaiting data compression before uploading annotations...")
while response_json["state"] != "Finished":
sleep(completion_verification_period)
response = self.session.get(url)
response_json = response.json()
logger_string= '''Awaiting compression for task {}.
Status={}, Message={}'''.format(task_id,
response_json['state'],
response_json['message'])
logger_string = """Awaiting compression for task {}.
Status={}, Message={}""".format(
task_id, response_json["state"], response_json["message"]
)
log.info(logger_string)
self.tasks_upload(task_id, annotation_format, annotation_path, pbar=pbar, **kwargs)
if dataset_repository_url:
response = self.session.post(
self.api.git_create(task_id),
json={
'path': dataset_repository_url,
'lfs': lfs,
'tid': task_id})
self.api.git_create(task_id),
json={"path": dataset_repository_url, "lfs": lfs, "tid": task_id},
)
response_json = response.json()
rq_id = response_json['rq_id']
rq_id = response_json["rq_id"]
log.info(f"Create RQ ID: {rq_id}")
check_url = self.api.git_check(rq_id)
response = self.session.get(check_url)
response_json = response.json()
while response_json['status'] != 'finished':
log.info('''Awaiting a dataset repository to be created for the task. Response status: {}'''.format(
response_json['status']))
while response_json["status"] != "finished":
log.info(
"""Awaiting a dataset repository to be created for the task. Response status: {}""".format(
response_json["status"]
)
)
sleep(git_completion_verification_period)
response = self.session.get(check_url)
response_json = response.json()
if response_json['status'] == 'failed' or response_json['status'] == 'unknown':
log.error(f'Dataset repository creation request for task {task_id} failed'
f'with status {response_json["status"]}.')
if response_json["status"] == "failed" or response_json["status"] == "unknown":
log.error(
f"Dataset repository creation request for task {task_id} failed"
f'with status {response_json["status"]}.'
)
break
log.info(f"Dataset repository creation completed with status: {response_json['status']}.")
log.info(
f"Dataset repository creation completed with status: {response_json['status']}."
)
return task_id
def tasks_delete(self, task_ids, **kwargs):
""" Delete a list of tasks, ignoring those which don't exist. """
"""Delete a list of tasks, ignoring those which don't exist."""
for task_id in task_ids:
url = self.api.tasks_id(task_id)
response = self.session.delete(url)
try:
response.raise_for_status()
log.info('Task ID {} deleted'.format(task_id))
log.info("Task ID {} deleted".format(task_id))
except requests.exceptions.HTTPError as e:
if response.status_code == 404:
log.info('Task ID {} not found'.format(task_id))
log.info("Task ID {} not found".format(task_id))
else:
raise e
def tasks_frame(self, task_id, frame_ids, outdir='', quality='original', **kwargs):
""" Download the requested frame numbers for a task and save images as
def tasks_frame(self, task_id, frame_ids, outdir="", quality="original", **kwargs):
"""Download the requested frame numbers for a task and save images as
task_<ID>_frame_<FRAME>.jpg."""
for frame_id in frame_ids:
url = self.api.tasks_id_frame_id(task_id, frame_id, quality)
response = self.session.get(url)
response.raise_for_status()
im = Image.open(BytesIO(response.content))
mime_type = im.get_format_mimetype() or 'image/jpg'
mime_type = im.get_format_mimetype() or "image/jpg"
im_ext = mimetypes.guess_extension(mime_type)
# FIXME It is better to use meta information from the server
# to determine the extension
# replace '.jpe' or '.jpeg' with a more used '.jpg'
if im_ext in ('.jpe', '.jpeg', None):
im_ext = '.jpg'
if im_ext in (".jpe", ".jpeg", None):
im_ext = ".jpg"
outfile = 'task_{}_frame_{:06d}{}'.format(task_id, frame_id, im_ext)
outfile = "task_{}_frame_{:06d}{}".format(task_id, frame_id, im_ext)
im.save(os.path.join(outdir, outfile))
def tasks_dump(self, task_id, fileformat, filename, *,
pbar=None, completion_check_period=2, **kwargs) -> None:
def tasks_dump(
self, task_id, fileformat, filename, *, pbar=None, completion_check_period=2, **kwargs
) -> None:
"""
Download annotations for a task in the specified format (e.g. 'YOLO ZIP 1.0').
"""
@ -264,23 +292,21 @@ class CLI:
response.raise_for_status()
response_json = response.json()
url = self.api.tasks_id_annotations_filename(task_id,
response_json['name'],
fileformat)
url = self.api.tasks_id_annotations_filename(task_id, response_json["name"], fileformat)
log.info("Waiting for the server to prepare the file...")
while True:
response = self.session.get(url)
response.raise_for_status()
log.info('STATUS {}'.format(response.status_code))
log.info("STATUS {}".format(response.status_code))
if response.status_code == 201:
break
sleep(completion_check_period)
if pbar is None:
pbar = self._make_pbar("Downloading")
self._download_file(url + '&action=download', output_path=filename, pbar=pbar)
self._download_file(url + "&action=download", output_path=filename, pbar=pbar)
log.info(f"Annotations have been exported to {filename}")
@ -309,13 +335,15 @@ class CLI:
Makes request to tus server to create a new upload url for the required file upload.
"""
headers = self.headers
headers['upload-length'] = str(self.file_size)
headers['upload-metadata'] = ','.join(self.encode_metadata())
headers['origin'] = cli.api.host # required by CVAT server
headers["upload-length"] = str(self.file_size)
headers["upload-metadata"] = ",".join(self.encode_metadata())
headers["origin"] = cli.api.host # required by CVAT server
resp = self._session.post(self.client.url, headers=headers)
url = resp.headers.get("location")
if url is None:
msg = 'Attempt to retrieve create file url with status {}'.format(resp.status_code)
msg = "Attempt to retrieve create file url with status {}".format(
resp.status_code
)
raise uploader.TusCommunicationError(msg, resp.status_code, resp.content)
return uploader.urljoin(self.client.url, url)
@ -328,9 +356,9 @@ class CLI:
http request to the tus server to retrieve the offset.
"""
resp = self._session.head(self.url, headers=self.headers)
offset = resp.headers.get('upload-offset')
offset = resp.headers.get("upload-offset")
if offset is None:
msg = 'Attempt to retrieve offset fails with status {}'.format(resp.status_code)
msg = "Attempt to retrieve offset fails with status {}".format(resp.status_code)
raise uploader.TusCommunicationError(msg, resp.status_code, resp.content)
return int(offset)
@ -342,40 +370,44 @@ class CLI:
file_size = os.stat(filename).st_size
with open(filename, 'rb') as input_file:
with open(filename, "rb") as input_file:
if pbar is not None:
input_file = StreamWithProgress(input_file, pbar, length=file_size)
tus_uploader = self._make_tus_uploader(url + '/', metadata=params,
file_stream=input_file, chunk_size=CHUNK_SIZE, log_func=logger)
tus_uploader = self._make_tus_uploader(
url + "/",
metadata=params,
file_stream=input_file,
chunk_size=CHUNK_SIZE,
log_func=logger,
)
tus_uploader.upload()
def _upload_file_with_tus(self, url, filename, *, params=None, pbar=None, logger=None):
# "CVAT-TUS" protocol has 2 extra messages
self._tus_start_upload(url, params=params)
self._upload_file_data_with_tus(url=url, filename=filename,
params=params, pbar=pbar, logger=logger)
self._upload_file_data_with_tus(
url=url, filename=filename, params=params, pbar=pbar, logger=logger
)
return self._tus_finish_upload(url, params=params)
def _tus_start_upload(self, url, *, params=None):
response = self.session.post(url, headers={'Upload-Start': ''}, params=params)
response = self.session.post(url, headers={"Upload-Start": ""}, params=params)
expect_status(202, response)
return response
def _tus_finish_upload(self, url, *, params=None, data=None):
response = self.session.post(url, headers={'Upload-Finish': ''}, params=params, data=data)
response = self.session.post(url, headers={"Upload-Finish": ""}, params=params, data=data)
expect_status(202, response)
return response
def tasks_upload(self, task_id, fileformat, filename, *,
completion_check_period=2, pbar=None, **kwargs):
""" Upload annotations for a task in the specified format
def tasks_upload(
self, task_id, fileformat, filename, *, completion_check_period=2, pbar=None, **kwargs
):
"""Upload annotations for a task in the specified format
(e.g. 'YOLO ZIP 1.0')."""
url = self.api.tasks_id_annotations(task_id)
params = {
'format': fileformat,
'filename': os.path.basename(filename)
}
params = {"format": fileformat, "filename": os.path.basename(filename)}
if pbar is None:
pbar = self._make_pbar("Uploading...")
@ -390,75 +422,68 @@ class CLI:
sleep(completion_check_period)
log.info(f"Upload job for Task ID {task_id} "
f"with annotation file {filename} finished")
log.info(f"Upload job for Task ID {task_id} " f"with annotation file {filename} finished")
def tasks_export(self, task_id, filename, *,
completion_check_period=2, pbar=None, **kwargs):
""" Download a task backup """
def tasks_export(self, task_id, filename, *, completion_check_period=2, pbar=None, **kwargs):
"""Download a task backup"""
log.info("Waiting for the server to prepare the file...")
url = self.api.tasks_id_backup(task_id)
while True:
response = self.session.get(url)
response.raise_for_status()
log.info('STATUS {}'.format(response.status_code))
log.info("STATUS {}".format(response.status_code))
if response.status_code == 201:
break
sleep(completion_check_period)
if pbar is None:
pbar = self._make_pbar("Downloading")
self._download_file(url + '?action=download', output_path=filename, pbar=pbar)
self._download_file(url + "?action=download", output_path=filename, pbar=pbar)
log.info(f"Task {task_id} has been exported sucessfully "
f"to {os.path.abspath(filename)}")
log.info(f"Task {task_id} has been exported sucessfully " f"to {os.path.abspath(filename)}")
def tasks_import(self, filename, *,
completion_check_period=2, pbar=None, **kwargs) -> None:
""" Import a task from a backup file"""
def tasks_import(self, filename, *, completion_check_period=2, pbar=None, **kwargs) -> None:
"""Import a task from a backup file"""
url = self.api.tasks_backup()
params = {
'filename': os.path.basename(filename)
}
params = {"filename": os.path.basename(filename)}
if pbar is None:
pbar = self._make_pbar("Uploading...")
response = self._upload_file_with_tus(url, filename,
params=params, pbar=pbar, logger=log.debug)
response = self._upload_file_with_tus(
url, filename, params=params, pbar=pbar, logger=log.debug
)
response_json = response.json()
rq_id = response_json['rq_id']
rq_id = response_json["rq_id"]
# check task status
while True:
sleep(completion_check_period)
response = self.session.post(
url,
data={'rq_id': rq_id}
)
response = self.session.post(url, data={"rq_id": rq_id})
if response.status_code == 201:
break
expect_status(202, response)
task_id = response.json()['id']
task_id = response.json()["id"]
log.info(f"Task has been imported sucessfully. Task ID: {task_id}")
def login(self, credentials):
url = self.api.login
auth = {'username': credentials[0], 'password': credentials[1]}
auth = {"username": credentials[0], "password": credentials[1]}
response = self.session.post(url, auth)
response.raise_for_status()
if 'csrftoken' in response.cookies:
self.session.headers['X-CSRFToken'] = response.cookies['csrftoken']
if "csrftoken" in response.cookies:
self.session.headers["X-CSRFToken"] = response.cookies["csrftoken"]
def _make_pbar(self, title: str = None) -> tqdm.tqdm:
return tqdm.tqdm(unit_scale=True, unit='B', unit_divisor=1024, desc=title)
return tqdm.tqdm(unit_scale=True, unit="B", unit_divisor=1024, desc=title)
def _download_file(self, url: str, output_path: str, *,
timeout: int = 60, pbar: Optional[tqdm.tqdm] = None) -> None:
def _download_file(
self, url: str, output_path: str, *, timeout: int = 60, pbar: Optional[tqdm.tqdm] = None
) -> None:
"""
Downloads the file from url into a temporary file, then renames it
to the requested name.
@ -478,7 +503,7 @@ class CLI:
response.raise_for_status()
try:
file_size = int(response.headers.get('Content-Length', 0))
file_size = int(response.headers.get("Content-Length", 0))
except (ValueError, KeyError):
file_size = None
@ -504,61 +529,63 @@ class CLI:
class CVAT_API_V2:
""" Build parameterized API URLs """
"""Build parameterized API URLs"""
def __init__(self, host, https=False):
if host.startswith('https://'):
if host.startswith("https://"):
https = True
if host.startswith('http://') or host.startswith('https://'):
host = host.replace('http://', '')
host = host.replace('https://', '')
scheme = 'https' if https else 'http'
self.host = '{}://{}'.format(scheme, host)
self.base = self.host + '/api/'
self.git = f'{scheme}://{host}/git/repository/'
if host.startswith("http://") or host.startswith("https://"):
host = host.replace("http://", "")
host = host.replace("https://", "")
scheme = "https" if https else "http"
self.host = "{}://{}".format(scheme, host)
self.base = self.host + "/api/"
self.git = f"{scheme}://{host}/git/repository/"
@property
def tasks(self):
return self.base + 'tasks'
return self.base + "tasks"
def tasks_page(self, page_id):
return self.tasks + '?page={}'.format(page_id)
return self.tasks + "?page={}".format(page_id)
def tasks_backup(self):
return self.tasks + '/backup'
return self.tasks + "/backup"
def tasks_id(self, task_id):
return self.tasks + '/{}'.format(task_id)
return self.tasks + "/{}".format(task_id)
def tasks_id_data(self, task_id):
return self.tasks_id(task_id) + '/data'
return self.tasks_id(task_id) + "/data"
def tasks_id_frame_id(self, task_id, frame_id, quality):
return self.tasks_id(task_id) + '/data?type=frame&number={}&quality={}'.format(frame_id, quality)
return self.tasks_id(task_id) + "/data?type=frame&number={}&quality={}".format(
frame_id, quality
)
def tasks_id_status(self, task_id):
return self.tasks_id(task_id) + '/status'
return self.tasks_id(task_id) + "/status"
def tasks_id_backup(self, task_id):
return self.tasks_id(task_id) + '/backup'
return self.tasks_id(task_id) + "/backup"
def tasks_id_annotations(self, task_id):
return self.tasks_id(task_id) + '/annotations'
return self.tasks_id(task_id) + "/annotations"
def tasks_id_annotations_format(self, task_id, fileformat):
return self.tasks_id_annotations(task_id) + '?format={}' \
.format(fileformat)
return self.tasks_id_annotations(task_id) + "?format={}".format(fileformat)
def tasks_id_annotations_filename(self, task_id, name, fileformat):
return self.tasks_id_annotations(task_id) + '?format={}&filename={}' \
.format(fileformat, name)
return self.tasks_id_annotations(task_id) + "?format={}&filename={}".format(
fileformat, name
)
def git_create(self, task_id):
return self.git + f'create/{task_id}'
return self.git + f"create/{task_id}"
def git_check(self, rq_id):
return self.git + f'check/{rq_id}'
return self.git + f"check/{rq_id}"
@property
def login(self):
return self.base + 'auth/login'
return self.base + "auth/login"

@ -7,21 +7,22 @@ import json
import logging
import os
from enum import Enum
from ..version import VERSION
def get_auth(s):
""" Parse USER[:PASS] strings and prompt for password if none was
supplied. """
user, _, password = s.partition(':')
password = password or os.environ.get('PASS') or getpass.getpass()
"""Parse USER[:PASS] strings and prompt for password if none was
supplied."""
user, _, password = s.partition(":")
password = password or os.environ.get("PASS") or getpass.getpass()
return user, password
def parse_label_arg(s):
""" If s is a file load it as JSON, otherwise parse s as JSON."""
"""If s is a file load it as JSON, otherwise parse s as JSON."""
if os.path.exists(s):
fp = open(s, 'r')
fp = open(s, "r")
return json.load(fp)
else:
return json.loads(s)
@ -51,51 +52,41 @@ class ResourceType(Enum):
# Command line interface definition
#######################################################################
parser = argparse.ArgumentParser(
description='Perform common operations related to CVAT tasks.\n\n'
)
parser = argparse.ArgumentParser(description="Perform common operations related to CVAT tasks.\n\n")
parser.add_argument("--version", action="version", version=VERSION)
task_subparser = parser.add_subparsers(dest='action')
task_subparser = parser.add_subparsers(dest="action")
#######################################################################
# Positional arguments
#######################################################################
parser.add_argument(
'--auth',
"--auth",
type=get_auth,
metavar='USER:[PASS]',
metavar="USER:[PASS]",
default=getpass.getuser(),
help='''defaults to the current user and supports the PASS
help="""defaults to the current user and supports the PASS
environment variable or password prompt
(default user: %(default)s).'''
)
parser.add_argument(
'--server-host',
type=str,
default='localhost',
help='host (default: %(default)s)'
(default user: %(default)s).""",
)
parser.add_argument(
'--server-port',
type=int,
default='8080',
help='port (default: %(default)s)'
"--server-host", type=str, default="localhost", help="host (default: %(default)s)"
)
parser.add_argument("--server-port", type=int, default="8080", help="port (default: %(default)s)")
parser.add_argument(
'--https',
"--https",
default=False,
action='store_true',
help='using https connection (default: %(default)s)'
action="store_true",
help="using https connection (default: %(default)s)",
)
parser.add_argument(
'--debug',
action='store_const',
dest='loglevel',
"--debug",
action="store_const",
dest="loglevel",
const=logging.DEBUG,
default=logging.INFO,
help='show debug output'
help="show debug output",
)
#######################################################################
@ -103,177 +94,134 @@ parser.add_argument(
#######################################################################
task_create_parser = task_subparser.add_parser(
'create',
description='''Create a new CVAT task. To create a task, you need
"create",
description="""Create a new CVAT task. To create a task, you need
to specify labels using the --labels argument or
attach the task to an existing project using the
--project_id argument.'''
--project_id argument.""",
)
task_create_parser.add_argument("name", type=str, help="name of the task")
task_create_parser.add_argument(
'name',
type=str,
help='name of the task'
)
task_create_parser.add_argument(
'resource_type',
default='local',
"resource_type",
default="local",
choices=list(ResourceType),
type=ResourceType.argparse,
help='type of files specified'
help="type of files specified",
)
task_create_parser.add_argument("resources", type=str, help="list of paths or URLs", nargs="+")
task_create_parser.add_argument(
'resources',
type=str,
help='list of paths or URLs',
nargs='+'
"--annotation_path", default="", type=str, help="path to annotation file"
)
task_create_parser.add_argument(
'--annotation_path',
default='',
"--annotation_format",
default="CVAT 1.1",
type=str,
help='path to annotation file'
help="format of the annotation file being uploaded, e.g. CVAT 1.1",
)
task_create_parser.add_argument(
'--annotation_format',
default='CVAT 1.1',
type=str,
help='format of the annotation file being uploaded, e.g. CVAT 1.1'
"--bug_tracker", "--bug", default=None, type=str, help="bug tracker URL"
)
task_create_parser.add_argument(
'--bug_tracker', '--bug',
default=None,
type=str,
help='bug tracker URL'
"--chunk_size", default=None, type=int, help="the number of frames per chunk"
)
task_create_parser.add_argument(
'--chunk_size',
default=None,
type=int,
help='the number of frames per chunk'
)
task_create_parser.add_argument(
'--completion_verification_period',
"--completion_verification_period",
default=20,
type=int,
help='''number of seconds to wait until checking
if data compression finished (necessary before uploading annotations)'''
help="""number of seconds to wait until checking
if data compression finished (necessary before uploading annotations)""",
)
task_create_parser.add_argument(
'--copy_data',
"--copy_data",
default=False,
action='store_true',
help='''set the option to copy the data, only used when resource type is
share (default: %(default)s)'''
action="store_true",
help="""set the option to copy the data, only used when resource type is
share (default: %(default)s)""",
)
task_create_parser.add_argument(
'--dataset_repository_url',
default='',
"--dataset_repository_url",
default="",
type=str,
help=('git repository to store annotations e.g.'
' https://github.com/user/repos [annotation/<anno_file_name.zip>]')
help=(
"git repository to store annotations e.g."
" https://github.com/user/repos [annotation/<anno_file_name.zip>]"
),
)
task_create_parser.add_argument(
'--frame_step',
"--frame_step",
default=None,
type=int,
help='''set the frame step option in the advanced configuration
when uploading image series or videos (default: %(default)s)'''
help="""set the frame step option in the advanced configuration
when uploading image series or videos (default: %(default)s)""",
)
task_create_parser.add_argument(
'--image_quality',
"--image_quality",
default=70,
type=int,
help='''set the image quality option in the advanced configuration
when creating tasks.(default: %(default)s)'''
help="""set the image quality option in the advanced configuration
when creating tasks.(default: %(default)s)""",
)
task_create_parser.add_argument(
'--labels',
default='[]',
"--labels",
default="[]",
type=parse_label_arg,
help='string or file containing JSON labels specification'
help="string or file containing JSON labels specification",
)
task_create_parser.add_argument(
'--lfs',
"--lfs",
default=False,
action='store_true',
help='using lfs for dataset repository (default: %(default)s)'
action="store_true",
help="using lfs for dataset repository (default: %(default)s)",
)
task_create_parser.add_argument(
'--project_id',
default=None,
type=int,
help='project ID if project exists'
"--project_id", default=None, type=int, help="project ID if project exists"
)
task_create_parser.add_argument(
'--overlap',
"--overlap",
default=None,
type=int,
help='the number of intersected frames between different segments'
help="the number of intersected frames between different segments",
)
task_create_parser.add_argument(
'--segment_size',
default=None,
type=int,
help='the number of frames in a segment'
"--segment_size", default=None, type=int, help="the number of frames in a segment"
)
task_create_parser.add_argument(
'--sorting-method',
default='lexicographical',
choices=['lexicographical', 'natural', 'predefined', 'random'],
help='''data soring method (default: %(default)s)'''
"--sorting-method",
default="lexicographical",
choices=["lexicographical", "natural", "predefined", "random"],
help="""data soring method (default: %(default)s)""",
)
task_create_parser.add_argument(
'--start_frame',
default=None,
type=int,
help='the start frame of the video'
"--start_frame", default=None, type=int, help="the start frame of the video"
)
task_create_parser.add_argument(
'--stop_frame',
default=None,
type=int,
help='the stop frame of the video'
"--stop_frame", default=None, type=int, help="the stop frame of the video"
)
task_create_parser.add_argument(
'--use_cache',
action='store_true', # automatically sets default=False
help='''use cache'''
"--use_cache", action="store_true", help="""use cache""" # automatically sets default=False
)
task_create_parser.add_argument(
'--use_zip_chunks',
action='store_true', # automatically sets default=False
help='''zip chunks before sending them to the server'''
"--use_zip_chunks",
action="store_true", # automatically sets default=False
help="""zip chunks before sending them to the server""",
)
#######################################################################
# Delete
#######################################################################
delete_parser = task_subparser.add_parser(
'delete',
description='Delete a CVAT task.'
)
delete_parser.add_argument(
'task_ids',
type=int,
help='list of task IDs',
nargs='+'
)
delete_parser = task_subparser.add_parser("delete", description="Delete a CVAT task.")
delete_parser.add_argument("task_ids", type=int, help="list of task IDs", nargs="+")
#######################################################################
# List
#######################################################################
ls_parser = task_subparser.add_parser(
'ls',
description='List all CVAT tasks in simple or JSON format.'
"ls", description="List all CVAT tasks in simple or JSON format."
)
ls_parser.add_argument(
'--json',
dest='use_json_output',
default=False,
action='store_true',
help='output JSON data'
"--json", dest="use_json_output", default=False, action="store_true", help="output JSON data"
)
#######################################################################
@ -281,58 +229,34 @@ ls_parser.add_argument(
#######################################################################
frames_parser = task_subparser.add_parser(
'frames',
description='Download all frame images for a CVAT task.'
"frames", description="Download all frame images for a CVAT task."
)
frames_parser.add_argument("task_id", type=int, help="task ID")
frames_parser.add_argument("frame_ids", type=int, help="list of frame IDs to download", nargs="+")
frames_parser.add_argument(
'task_id',
type=int,
help='task ID'
)
frames_parser.add_argument(
'frame_ids',
type=int,
help='list of frame IDs to download',
nargs='+'
)
frames_parser.add_argument(
'--outdir',
type=str,
default='',
help='directory to save images (default: CWD)'
"--outdir", type=str, default="", help="directory to save images (default: CWD)"
)
frames_parser.add_argument(
'--quality',
"--quality",
type=str,
choices=('original', 'compressed'),
default='original',
help='choose quality of images (default: %(default)s)'
choices=("original", "compressed"),
default="original",
help="choose quality of images (default: %(default)s)",
)
#######################################################################
# Dump
#######################################################################
dump_parser = task_subparser.add_parser(
'dump',
description='Download annotations for a CVAT task.'
)
dump_parser.add_argument(
'task_id',
type=int,
help='task ID'
)
dump_parser.add_argument(
'filename',
type=str,
help='output file'
)
dump_parser = task_subparser.add_parser("dump", description="Download annotations for a CVAT task.")
dump_parser.add_argument("task_id", type=int, help="task ID")
dump_parser.add_argument("filename", type=str, help="output file")
dump_parser.add_argument(
'--format',
dest='fileformat',
"--format",
dest="fileformat",
type=str,
default='CVAT for images 1.1',
help='annotation format (default: %(default)s)'
default="CVAT for images 1.1",
help="annotation format (default: %(default)s)",
)
#######################################################################
@ -340,56 +264,29 @@ dump_parser.add_argument(
#######################################################################
upload_parser = task_subparser.add_parser(
'upload',
description='Upload annotations for a CVAT task.'
"upload", description="Upload annotations for a CVAT task."
)
upload_parser.add_argument("task_id", type=int, help="task ID")
upload_parser.add_argument("filename", type=str, help="upload file")
upload_parser.add_argument(
'task_id',
type=int,
help='task ID'
)
upload_parser.add_argument(
'filename',
"--format",
dest="fileformat",
type=str,
help='upload file'
)
upload_parser.add_argument(
'--format',
dest='fileformat',
type=str,
default='CVAT 1.1',
help='annotation format (default: %(default)s)'
default="CVAT 1.1",
help="annotation format (default: %(default)s)",
)
#######################################################################
# Export task
#######################################################################
export_task_parser = task_subparser.add_parser(
'export',
description='Export a CVAT task.'
)
export_task_parser.add_argument(
'task_id',
type=int,
help='task ID'
)
export_task_parser.add_argument(
'filename',
type=str,
help='output file'
)
export_task_parser = task_subparser.add_parser("export", description="Export a CVAT task.")
export_task_parser.add_argument("task_id", type=int, help="task ID")
export_task_parser.add_argument("filename", type=str, help="output file")
#######################################################################
# Import task
#######################################################################
import_task_parser = task_subparser.add_parser(
'import',
description='Import a CVAT task.'
)
import_task_parser.add_argument(
'filename',
type=str,
help='upload file'
)
import_task_parser = task_subparser.add_parser("import", description="Import a CVAT task.")
import_task_parser.add_argument("filename", type=str, help="upload file")

@ -15,7 +15,7 @@ class StreamWithProgress:
self.stream = stream
self.pbar = pbar
if hasattr(stream, '__len__'):
if hasattr(stream, "__len__"):
length = len(stream)
self.length = length
@ -37,8 +37,10 @@ class StreamWithProgress:
def tell(self):
return self.stream.tell()
def expect_status(code: int, response: requests.Response) -> None:
response.raise_for_status()
if response.status_code != code:
raise Exception("Failed to upload file: "
f"unexpected status code received ({response.status_code})")
raise Exception(
"Failed to upload file: " f"unexpected status code received ({response.status_code})"
)

@ -8,16 +8,15 @@ import os
import unittest.mock as mock
from contextlib import closing, redirect_stdout
import cvat.apps.engine.log as log
from cvat.apps.engine.tests.test_rest_api import create_db_users, generate_image_file
from datumaro.util.scope import on_exit_do, scoped
from django.conf import settings
from PIL import Image
from rest_framework.test import APITestCase, RequestsClient
from datumaro.util.scope import scoped, on_exit_do
from tqdm import tqdm
import cvat.apps.engine.log as log
from cvat.apps.engine.tests.test_rest_api import (create_db_users,
generate_image_file)
from cvat_cli.core import CLI, CVAT_API_V2, ResourceType
from tqdm import tqdm
class TestCLI(APITestCase):
@ -32,16 +31,16 @@ class TestCLI(APITestCase):
self.mock_stdout = mock_stdout
self.client = RequestsClient()
self.credentials = ('admin', 'admin')
self.api = CVAT_API_V2('testserver')
self.credentials = ("admin", "admin")
self.api = CVAT_API_V2("testserver")
self.cli = CLI(self.client, self.api, self.credentials)
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.img_file = os.path.join(settings.SHARE_ROOT, 'test_cli.jpg')
cls.img_file = os.path.join(settings.SHARE_ROOT, "test_cli.jpg")
_, data = generate_image_file(cls.img_file)
with open(cls.img_file, 'wb') as image:
with open(cls.img_file, "wb") as image:
image.write(data.read())
@classmethod
@ -62,14 +61,19 @@ class TestCLI(APITestCase):
with closing(io.StringIO()) as pbar_out:
pbar = tqdm(file=pbar_out, mininterval=0)
task_id = self.cli.tasks_create('test_task',
[{'name' : 'car'}, {'name': 'person'}],
ResourceType.LOCAL, [self.img_file], pbar=pbar)
task_id = self.cli.tasks_create(
"test_task",
[{"name": "car"}, {"name": "person"}],
ResourceType.LOCAL,
[self.img_file],
pbar=pbar,
)
pbar_out = pbar_out.getvalue().strip('\r').split('\r')
pbar_out = pbar_out.getvalue().strip("\r").split("\r")
self.assertEqual(1, task_id)
self.assertRegex(pbar_out[-1], '100%')
self.assertRegex(pbar_out[-1], "100%")
class TestTaskOperations(APITestCase):
def setUp(self):
@ -83,14 +87,17 @@ class TestTaskOperations(APITestCase):
self.mock_stdout = mock_stdout
self.client = RequestsClient()
self.credentials = ('admin', 'admin')
self.api = CVAT_API_V2('testserver')
self.credentials = ("admin", "admin")
self.api = CVAT_API_V2("testserver")
self.cli = CLI(self.client, self.api, self.credentials)
self.taskname = 'test_task'
self.task_id = self.cli.tasks_create(self.taskname,
[{'name' : 'car'}, {'name': 'person'}],
ResourceType.LOCAL,
[self.img_file], pbar=mock.MagicMock())
self.taskname = "test_task"
self.task_id = self.cli.tasks_create(
self.taskname,
[{"name": "car"}, {"name": "person"}],
ResourceType.LOCAL,
[self.img_file],
pbar=mock.MagicMock(),
)
def tearDown(self):
super().tearDown()
@ -100,9 +107,9 @@ class TestTaskOperations(APITestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.img_file = os.path.join(settings.SHARE_ROOT, 'test_cli.jpg')
cls.img_file = os.path.join(settings.SHARE_ROOT, "test_cli.jpg")
_, data = generate_image_file(cls.img_file)
with open(cls.img_file, 'wb') as image:
with open(cls.img_file, "wb") as image:
image.write(data.read())
@classmethod
@ -116,31 +123,31 @@ class TestTaskOperations(APITestCase):
def test_tasks_list(self):
self.cli.tasks_list(False)
self.assertRegex(self.mock_stdout.getvalue(), '.*{}.*'.format(self.taskname))
self.assertRegex(self.mock_stdout.getvalue(), ".*{}.*".format(self.taskname))
def test_tasks_delete(self):
self.cli.tasks_delete([1])
self.cli.tasks_list(False)
self.assertRegex(self.mock_stdout.getvalue(), '.*Task ID {} deleted.*'.format(1))
self.assertRegex(self.mock_stdout.getvalue(), ".*Task ID {} deleted.*".format(1))
@scoped
def test_tasks_dump(self):
path = os.path.join(settings.SHARE_ROOT, 'test_cli.zip')
path = os.path.join(settings.SHARE_ROOT, "test_cli.zip")
with closing(io.StringIO()) as pbar_out:
pbar = tqdm(file=pbar_out, mininterval=0)
self.cli.tasks_dump(self.task_id, 'CVAT for images 1.1', path, pbar=pbar)
self.cli.tasks_dump(self.task_id, "CVAT for images 1.1", path, pbar=pbar)
on_exit_do(os.remove, path)
pbar_out = pbar_out.getvalue().strip('\r').split('\r')
pbar_out = pbar_out.getvalue().strip("\r").split("\r")
self.assertTrue(os.path.exists(path))
self.assertRegex(pbar_out[-1], '100%')
self.assertRegex(pbar_out[-1], "100%")
@scoped
def test_tasks_export(self):
path = os.path.join(settings.SHARE_ROOT, 'test_cli.zip')
path = os.path.join(settings.SHARE_ROOT, "test_cli.zip")
with closing(io.StringIO()) as pbar_out:
pbar = tqdm(file=pbar_out, mininterval=0)
@ -148,57 +155,55 @@ class TestTaskOperations(APITestCase):
self.cli.tasks_export(self.task_id, path, pbar=pbar)
on_exit_do(os.remove, path)
pbar_out = pbar_out.getvalue().strip('\r').split('\r')
pbar_out = pbar_out.getvalue().strip("\r").split("\r")
self.assertTrue(os.path.exists(path))
self.assertRegex(pbar_out[-1], '100%')
self.assertRegex(pbar_out[-1], "100%")
@scoped
def test_tasks_frame_original(self):
path = os.path.join(settings.SHARE_ROOT, 'task_1_frame_000000.jpg')
path = os.path.join(settings.SHARE_ROOT, "task_1_frame_000000.jpg")
self.cli.tasks_frame(self.task_id, [0],
outdir=settings.SHARE_ROOT, quality='original')
self.cli.tasks_frame(self.task_id, [0], outdir=settings.SHARE_ROOT, quality="original")
on_exit_do(os.remove, path)
self.assertTrue(os.path.exists(path))
@scoped
def test_tasks_frame(self):
path = os.path.join(settings.SHARE_ROOT, 'task_1_frame_000000.jpg')
path = os.path.join(settings.SHARE_ROOT, "task_1_frame_000000.jpg")
self.cli.tasks_frame(self.task_id, [0],
outdir=settings.SHARE_ROOT, quality='compressed')
self.cli.tasks_frame(self.task_id, [0], outdir=settings.SHARE_ROOT, quality="compressed")
on_exit_do(os.remove, path)
self.assertTrue(os.path.exists(path))
@scoped
def test_tasks_upload(self):
path = os.path.join(settings.SHARE_ROOT, 'test_cli.json')
path = os.path.join(settings.SHARE_ROOT, "test_cli.json")
self._generate_coco_file(path)
on_exit_do(os.remove, path)
with closing(io.StringIO()) as pbar_out:
pbar = tqdm(file=pbar_out, mininterval=0)
self.cli.tasks_upload(self.task_id, 'COCO 1.0', path, pbar=pbar)
self.cli.tasks_upload(self.task_id, "COCO 1.0", path, pbar=pbar)
pbar_out = pbar_out.getvalue().strip('\r').split('\r')
pbar_out = pbar_out.getvalue().strip("\r").split("\r")
self.assertRegex(self.mock_stdout.getvalue(), '.*{}.*'.format("annotation file"))
self.assertRegex(pbar_out[-1], '100%')
self.assertRegex(self.mock_stdout.getvalue(), ".*{}.*".format("annotation file"))
self.assertRegex(pbar_out[-1], "100%")
@scoped
def test_tasks_import(self):
anno_path = os.path.join(settings.SHARE_ROOT, 'test_cli.json')
anno_path = os.path.join(settings.SHARE_ROOT, "test_cli.json")
self._generate_coco_file(anno_path)
on_exit_do(os.remove, anno_path)
backup_path = os.path.join(settings.SHARE_ROOT, 'task_backup.zip')
backup_path = os.path.join(settings.SHARE_ROOT, "task_backup.zip")
with closing(io.StringIO()) as pbar_out:
pbar = tqdm(file=pbar_out, mininterval=0)
self.cli.tasks_upload(self.task_id, 'COCO 1.0', anno_path, pbar=pbar)
self.cli.tasks_upload(self.task_id, "COCO 1.0", anno_path, pbar=pbar)
self.cli.tasks_export(self.task_id, backup_path, pbar=pbar)
on_exit_do(os.remove, backup_path)
@ -207,17 +212,18 @@ class TestTaskOperations(APITestCase):
self.cli.tasks_import(backup_path, pbar=pbar)
pbar_out = pbar_out.getvalue().strip('\r').split('\r')
pbar_out = pbar_out.getvalue().strip("\r").split("\r")
self.assertRegex(self.mock_stdout.getvalue(), '.*{}.*'.format("exported sucessfully"))
self.assertRegex(pbar_out[-1], '100%')
self.assertRegex(self.mock_stdout.getvalue(), ".*{}.*".format("exported sucessfully"))
self.assertRegex(pbar_out[-1], "100%")
def _generate_coco_file(self, path):
test_image = Image.open(self.img_file)
image_width, image_height = test_image.size
content = self._generate_coco_anno(os.path.basename(self.img_file),
image_width=image_width, image_height=image_height)
content = self._generate_coco_anno(
os.path.basename(self.img_file), image_width=image_width, image_height=image_height
)
with open(path, "w") as coco:
coco.write(content)
@ -268,7 +274,7 @@ class TestTaskOperations(APITestCase):
]
}
""" % {
'image_path': image_path,
'image_height': image_height,
'image_width': image_width
"image_path": image_path,
"image_height": image_height,
"image_width": image_width,
}

Loading…
Cancel
Save