Simplify upload data for task (#5498)

It's possible to specify only the manifest file and filename pattern for
creating task with cloud storage data.
The special characters supported now for the pattern are `*`, `?`,
`[seq]`, `[!seq]`.
Please see
[here](8898a8b264/tests/python/rest_api/test_tasks.py (L686))
for some examples of how to use this functionality.

Co-authored-by: Maxim Zhiltsov <zhiltsov.max35@gmail.com>
main
Maria Khrustaleva 3 years ago committed by GitHub
parent 207116705f
commit e624c5b959
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## \[2.4.0] - Unreleased ## \[2.4.0] - Unreleased
### Added ### Added
- TDB - Filename pattern to simplify uploading cloud storage data for a task (<https://github.com/opencv/cvat/pull/5498>)
### Changed ### Changed
- TDB - TDB

@ -371,12 +371,13 @@ class DataSerializer(WriteOnceMixin, serializers.ModelSerializer):
use_cache = serializers.BooleanField(default=False) use_cache = serializers.BooleanField(default=False)
copy_data = serializers.BooleanField(default=False) copy_data = serializers.BooleanField(default=False)
cloud_storage_id = serializers.IntegerField(write_only=True, allow_null=True, required=False) cloud_storage_id = serializers.IntegerField(write_only=True, allow_null=True, required=False)
filename_pattern = serializers.CharField(allow_null=True, required=False)
class Meta: class Meta:
model = models.Data model = models.Data
fields = ('chunk_size', 'size', 'image_quality', 'start_frame', 'stop_frame', 'frame_filter', fields = ('chunk_size', 'size', 'image_quality', 'start_frame', 'stop_frame', 'frame_filter',
'compressed_chunk_type', 'original_chunk_type', 'client_files', 'server_files', 'remote_files', 'use_zip_chunks', 'compressed_chunk_type', 'original_chunk_type', 'client_files', 'server_files', 'remote_files', 'use_zip_chunks',
'cloud_storage_id', 'use_cache', 'copy_data', 'storage_method', 'storage', 'sorting_method') 'cloud_storage_id', 'use_cache', 'copy_data', 'storage_method', 'storage', 'sorting_method', 'filename_pattern')
# pylint: disable=no-self-use # pylint: disable=no-self-use
def validate_frame_filter(self, value): def validate_frame_filter(self, value):
@ -396,6 +397,7 @@ class DataSerializer(WriteOnceMixin, serializers.ModelSerializer):
if 'start_frame' in attrs and 'stop_frame' in attrs \ if 'start_frame' in attrs and 'stop_frame' in attrs \
and attrs['start_frame'] > attrs['stop_frame']: and attrs['start_frame'] > attrs['stop_frame']:
raise serializers.ValidationError('Stop frame must be more or equal start frame') raise serializers.ValidationError('Stop frame must be more or equal start frame')
return attrs return attrs
def create(self, validated_data): def create(self, validated_data):

@ -5,6 +5,7 @@
# SPDX-License-Identifier: MIT # SPDX-License-Identifier: MIT
import itertools import itertools
import fnmatch
import os import os
import sys import sys
from rest_framework.serializers import ValidationError from rest_framework.serializers import ValidationError
@ -127,7 +128,7 @@ def _save_task_to_db(db_task, extractor):
db_task.data.save() db_task.data.save()
db_task.save() db_task.save()
def _count_files(data, manifest_files=None): def _count_files(data):
share_root = settings.SHARE_ROOT share_root = settings.SHARE_ROOT
server_files = [] server_files = []
@ -158,7 +159,7 @@ def _count_files(data, manifest_files=None):
if mime in counter: if mime in counter:
counter[mime].append(rel_path) counter[mime].append(rel_path)
elif rel_path.endswith('.jsonl'): elif rel_path.endswith('.jsonl'):
manifest_files.append(rel_path) continue
else: else:
slogger.glob.warn("Skip '{}' file (its mime type doesn't " slogger.glob.warn("Skip '{}' file (its mime type doesn't "
"correspond to supported MIME file type)".format(full_path)) "correspond to supported MIME file type)".format(full_path))
@ -177,6 +178,12 @@ def _count_files(data, manifest_files=None):
return counter return counter
def _find_manifest_files(data):
manifest_files = []
for files in ['client_files', 'server_files', 'remote_files']:
manifest_files.extend(list(filter(lambda x: x.endswith('.jsonl'), data[files])))
return manifest_files
def _validate_data(counter, manifest_files=None): def _validate_data(counter, manifest_files=None):
unique_entries = 0 unique_entries = 0
multiple_entries = 0 multiple_entries = 0
@ -207,10 +214,10 @@ def _validate_data(counter, manifest_files=None):
return counter, task_modes[0] return counter, task_modes[0]
def _validate_manifest(manifests, root_dir, is_in_cloud, db_cloud_storage): def _validate_manifest(manifests, root_dir, is_in_cloud, db_cloud_storage, data_storage_method):
if manifests: if manifests:
if len(manifests) != 1: if len(manifests) != 1:
raise Exception('Only one manifest file can be attached with data') raise ValidationError('Only one manifest file can be attached to data')
manifest_file = manifests[0] manifest_file = manifests[0]
full_manifest_path = os.path.join(root_dir, manifests[0]) full_manifest_path = os.path.join(root_dir, manifests[0])
if is_in_cloud: if is_in_cloud:
@ -221,8 +228,10 @@ def _validate_manifest(manifests, root_dir, is_in_cloud, db_cloud_storage):
< cloud_storage_instance.get_file_last_modified(manifest_file): < cloud_storage_instance.get_file_last_modified(manifest_file):
cloud_storage_instance.download_file(manifest_file, full_manifest_path) cloud_storage_instance.download_file(manifest_file, full_manifest_path)
if is_manifest(full_manifest_path): if is_manifest(full_manifest_path):
if not (settings.USE_CACHE or data_storage_method != models.StorageMethodChoice.CACHE):
raise ValidationError("Manifest file can be uploaded only if 'Use cache' option is also selected")
return manifest_file return manifest_file
raise Exception('Invalid manifest was uploaded') raise ValidationError('Invalid manifest was uploaded')
return None return None
def _validate_url(url): def _validate_url(url):
@ -291,6 +300,26 @@ def _download_data(urls, upload_dir):
def _get_manifest_frame_indexer(start_frame=0, frame_step=1): def _get_manifest_frame_indexer(start_frame=0, frame_step=1):
return lambda frame_id: start_frame + frame_id * frame_step return lambda frame_id: start_frame + frame_id * frame_step
def _create_task_manifest_based_on_cloud_storage_manifest(
sorted_media,
cloud_storage_manifest_prefix,
cloud_storage_manifest,
manifest
):
if cloud_storage_manifest_prefix:
sorted_media_without_manifest_prefix = [
os.path.relpath(i, cloud_storage_manifest_prefix) for i in sorted_media
]
sequence, raw_content = cloud_storage_manifest.get_subset(sorted_media_without_manifest_prefix)
def _add_prefix(properties):
file_name = properties['name']
properties['name'] = os.path.join(cloud_storage_manifest_prefix, file_name)
return properties
content = list(map(_add_prefix, raw_content))
else:
sequence, content = cloud_storage_manifest.get_subset(sorted_media)
sorted_content = (i[1] for i in sorted(zip(sequence, content)))
manifest.create(sorted_content)
@transaction.atomic @transaction.atomic
def _create_thread(db_task, data, isBackupRestore=False, isDatasetImport=False): def _create_thread(db_task, data, isBackupRestore=False, isDatasetImport=False):
@ -300,69 +329,80 @@ def _create_thread(db_task, data, isBackupRestore=False, isDatasetImport=False):
slogger.glob.info("create task #{}".format(db_task.id)) slogger.glob.info("create task #{}".format(db_task.id))
db_data = db_task.data db_data = db_task.data
upload_dir = db_data.get_upload_dirname() upload_dir = db_data.get_upload_dirname() if db_data.storage != models.StorageChoice.SHARE else settings.SHARE_ROOT
is_data_in_cloud = db_data.storage == models.StorageChoice.CLOUD_STORAGE is_data_in_cloud = db_data.storage == models.StorageChoice.CLOUD_STORAGE
if data['remote_files'] and not isDatasetImport: if data['remote_files'] and not isDatasetImport:
data['remote_files'] = _download_data(data['remote_files'], upload_dir) data['remote_files'] = _download_data(data['remote_files'], upload_dir)
manifest_files = [] # find and validate manifest file
media = _count_files(data, manifest_files) manifest_files = _find_manifest_files(data)
media, task_mode = _validate_data(media, manifest_files)
if data['server_files']:
if db_data.storage == models.StorageChoice.LOCAL:
_copy_data_from_source(data['server_files'], upload_dir, data.get('server_files_path'))
elif db_data.storage == models.StorageChoice.SHARE:
upload_dir = settings.SHARE_ROOT
manifest_root = None manifest_root = None
if db_data.storage in {models.StorageChoice.LOCAL, models.StorageChoice.SHARE}:
# we should also handle this case because files from the share source have not been downloaded yet
if data['copy_data']:
manifest_root = settings.SHARE_ROOT
elif db_data.storage in {models.StorageChoice.LOCAL, models.StorageChoice.SHARE}:
manifest_root = upload_dir manifest_root = upload_dir
elif is_data_in_cloud: elif is_data_in_cloud:
manifest_root = db_data.cloud_storage.get_storage_dirname() manifest_root = db_data.cloud_storage.get_storage_dirname()
manifest_file = _validate_manifest( manifest_file = _validate_manifest(
manifest_files, manifest_root, manifest_files, manifest_root,
is_data_in_cloud, db_data.cloud_storage if is_data_in_cloud else None is_data_in_cloud, db_data.cloud_storage if is_data_in_cloud else None,
db_data.storage_method,
) )
if manifest_file and (not settings.USE_CACHE or db_data.storage_method != models.StorageMethodChoice.CACHE):
raise Exception("File with meta information can be uploaded if 'Use cache' option is also selected")
if data['server_files'] and is_data_in_cloud: if is_data_in_cloud:
cloud_storage_instance = db_storage_to_storage_instance(db_data.cloud_storage) cloud_storage_instance = db_storage_to_storage_instance(db_data.cloud_storage)
sorted_media = sort(media['image'], data['sorting_method'])
data_size = len(sorted_media)
segment_step, *_ = _get_task_segment_data(db_task, data_size)
for start_frame in range(0, data_size, segment_step):
first_sorted_media_image = sorted_media[start_frame]
cloud_storage_instance.download_file(first_sorted_media_image, os.path.join(upload_dir, first_sorted_media_image))
# prepare task manifest file from cloud storage manifest file
# NOTE we should create manifest before defining chunk_size
# FIXME in the future when will be implemented archive support
manifest = ImageManifestManager(db_data.get_manifest_path()) manifest = ImageManifestManager(db_data.get_manifest_path())
cloud_storage_manifest = ImageManifestManager( cloud_storage_manifest = ImageManifestManager(
os.path.join(db_data.cloud_storage.get_storage_dirname(), manifest_file), os.path.join(db_data.cloud_storage.get_storage_dirname(), manifest_file),
db_data.cloud_storage.get_storage_dirname() db_data.cloud_storage.get_storage_dirname()
) )
cloud_storage_manifest_prefix = os.path.dirname(manifest_file)
cloud_storage_manifest.set_index() cloud_storage_manifest.set_index()
if cloud_storage_manifest_prefix: cloud_storage_manifest_prefix = os.path.dirname(manifest_file)
sorted_media_without_manifest_prefix = [
os.path.relpath(i, cloud_storage_manifest_prefix) for i in sorted_media # update list with server files if task creation approach with pattern and manifest file is used
] if is_data_in_cloud and data['filename_pattern']:
sequence, raw_content = cloud_storage_manifest.get_subset(sorted_media_without_manifest_prefix) if 1 != len(data['server_files']):
def _add_prefix(properties): l = len(data['server_files']) - 1
file_name = properties['name'] raise ValidationError(
properties['name'] = os.path.join(cloud_storage_manifest_prefix, file_name) 'Using a filename_pattern is only supported with a manifest file, '
return properties f'but others {l} file{"s" if l > 1 else ""} {"were" if l > 1 else "was"} found'
content = list(map(_add_prefix, raw_content)) 'Please remove extra files and keep only manifest file in server_files field.'
)
cloud_storage_manifest_data = list(cloud_storage_manifest.data) if not cloud_storage_manifest_prefix \
else [os.path.join(cloud_storage_manifest_prefix, f) for f in cloud_storage_manifest.data]
if data['filename_pattern'] == '*':
server_files = cloud_storage_manifest_data
else: else:
sequence, content = cloud_storage_manifest.get_subset(sorted_media) server_files = fnmatch.filter(cloud_storage_manifest_data, data['filename_pattern'])
sorted_content = (i[1] for i in sorted(zip(sequence, content))) data['server_files'].extend(server_files)
manifest.create(sorted_content)
# count and validate uploaded files
media = _count_files(data)
media, task_mode = _validate_data(media, manifest_files)
if data['server_files']:
if db_data.storage == models.StorageChoice.LOCAL:
_copy_data_from_source(data['server_files'], upload_dir, data.get('server_files_path'))
elif is_data_in_cloud:
sorted_media = sort(media['image'], data['sorting_method'])
# download previews from cloud storage
data_size = len(sorted_media)
segment_step, *_ = _get_task_segment_data(db_task, data_size)
for preview_frame in range(0, data_size, segment_step):
preview = sorted_media[preview_frame]
cloud_storage_instance.download_file(preview, os.path.join(upload_dir, preview))
# Define task manifest content based on cloud storage manifest content and uploaded files
_create_task_manifest_based_on_cloud_storage_manifest(
sorted_media, cloud_storage_manifest_prefix,
cloud_storage_manifest, manifest)
av_scan_paths(upload_dir) av_scan_paths(upload_dir)

@ -4,8 +4,12 @@
# SPDX-License-Identifier: MIT # SPDX-License-Identifier: MIT
import json import json
import os.path as osp
import subprocess
from copy import deepcopy from copy import deepcopy
from functools import partial
from http import HTTPStatus from http import HTTPStatus
from tempfile import TemporaryDirectory
from time import sleep from time import sleep
import pytest import pytest
@ -13,6 +17,7 @@ from cvat_sdk.api_client import apis, models
from cvat_sdk.core.helpers import get_paginated_collection from cvat_sdk.core.helpers import get_paginated_collection
from deepdiff import DeepDiff from deepdiff import DeepDiff
import shared.utils.s3 as s3
from shared.utils.config import get_method, make_api_client, patch_method from shared.utils.config import get_method, make_api_client, patch_method
from shared.utils.helpers import generate_image_files from shared.utils.helpers import generate_image_files
@ -675,6 +680,120 @@ class TestPostTaskData:
self._USERNAME, task_spec, data_spec, content_type="application/json", org=org self._USERNAME, task_spec, data_spec, content_type="application/json", org=org
) )
@pytest.mark.with_external_services
@pytest.mark.parametrize("cloud_storage_id", [1])
@pytest.mark.parametrize(
"manifest, filename_pattern, sub_dir, task_size",
[
("manifest.jsonl", "*", True, 3), # public bucket
("manifest.jsonl", "test/*", True, 3),
("manifest.jsonl", "test/sub*1.jpeg", True, 1),
("manifest.jsonl", "*image*.jpeg", True, 3),
("manifest.jsonl", "wrong_pattern", True, 0),
("abc_manifest.jsonl", "[a-c]*.jpeg", False, 2),
("abc_manifest.jsonl", "[d]*.jpeg", False, 1),
("abc_manifest.jsonl", "[e-z]*.jpeg", False, 0),
],
)
@pytest.mark.parametrize("org", [""])
def test_create_task_with_file_pattern(
self,
cloud_storage_id,
manifest,
filename_pattern,
sub_dir,
task_size,
org,
cloud_storages,
request,
):
# prepare dataset on the bucket
prefixes = ("test_image_",) * 3 if sub_dir else ("a_", "b_", "d_")
images = generate_image_files(3, prefixes=prefixes)
s3_client = s3.make_client()
cloud_storage = cloud_storages[cloud_storage_id]
for image in images:
s3_client.create_file(
data=image,
bucket=cloud_storage["resource"],
filename=f"{'test/sub/' if sub_dir else ''}{image.name}",
)
request.addfinalizer(
partial(
s3_client.remove_file,
bucket=cloud_storage["resource"],
filename=f"{'test/sub/' if sub_dir else ''}{image.name}",
)
)
with TemporaryDirectory() as tmp_dir:
for image in images:
with open(osp.join(tmp_dir, image.name), "wb") as f:
f.write(image.getvalue())
command = [
"docker",
"run",
"--rm",
"-u",
"root:root",
"-v",
f"{tmp_dir}:/local",
"--entrypoint",
"python3",
"cvat/server",
"utils/dataset_manifest/create.py",
"--output-dir",
"/local",
"/local",
]
subprocess.run(command, check=True)
with open(osp.join(tmp_dir, "manifest.jsonl"), mode="rb") as m_file:
s3_client.create_file(
data=m_file.read(),
bucket=cloud_storage["resource"],
filename=f"test/sub/{manifest}" if sub_dir else manifest,
)
request.addfinalizer(
partial(
s3_client.remove_file,
bucket=cloud_storage["resource"],
filename=f"test/sub/{manifest}" if sub_dir else manifest,
)
)
task_spec = {
"name": f"Task with files from cloud storage {cloud_storage_id}",
"labels": [
{
"name": "car",
}
],
}
data_spec = {
"image_quality": 75,
"use_cache": True,
"cloud_storage_id": cloud_storage_id,
"server_files": [f"test/sub/{manifest}" if sub_dir else manifest],
"filename_pattern": filename_pattern,
}
if task_size:
task_id = self._test_create_task(
self._USERNAME, task_spec, data_spec, content_type="application/json", org=org
)
with make_api_client(self._USERNAME) as api_client:
(task, response) = api_client.tasks_api.retrieve(task_id, org=org)
assert response.status == HTTPStatus.OK
assert task.size == task_size
else:
status = self._test_cannot_create_task(self._USERNAME, task_spec, data_spec)
assert "No media data found" in status.message
@pytest.mark.with_external_services @pytest.mark.with_external_services
@pytest.mark.parametrize( @pytest.mark.parametrize(
"cloud_storage_id, manifest, org", "cloud_storage_id, manifest, org",

@ -18,10 +18,11 @@ def generate_image_file(filename="image.png", size=(50, 50), color=(0, 0, 0)):
return f return f
def generate_image_files(count) -> List[BytesIO]: def generate_image_files(count, prefixes=None) -> List[BytesIO]:
images = [] images = []
for i in range(count): for i in range(count):
image = generate_image_file(f"{i}.jpeg", color=(i, i, i)) prefix = prefixes[i] if prefixes else ""
image = generate_image_file(f"{prefix}{i}.jpeg", color=(i, i, i))
images.append(image) images.append(image)
return images return images

Loading…
Cancel
Save