Tus for task annotations import (#4327)

* add data_type to tus mixin

* added tus for task annotations import

* added tus for jobs annotations import

* applied comments

* fix test

* fix incorrect upload endpoint

* add location creation based on origin

* remove unused import

* remove data_type

* remove unused comment

* update changelog to new release

Co-authored-by: Nikita Manovich <nikita.manovich@intel.com>
main
Kirill Lakhov 4 years ago committed by GitHub
parent 1225fbb1bc
commit 1fa2676a56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## \[2.1.0] - Unreleased
### Added
- Task annotations importing via chunk uploads (<https://github.com/openvinotoolkit/cvat/pull/4327>)
- Advanced filtration and sorting for a list of tasks/projects/cloudstorages (<https://github.com/openvinotoolkit/cvat/pull/4403>)
### Changed

@ -45,6 +45,49 @@
});
}
async function chunkUpload(file, uploadConfig) {
const params = enableOrganization();
const {
endpoint, chunkSize, totalSize, onUpdate,
} = uploadConfig;
let { totalSentSize } = uploadConfig;
return new Promise((resolve, reject) => {
const upload = new tus.Upload(file, {
endpoint,
metadata: {
filename: file.name,
filetype: file.type,
},
headers: {
Authorization: Axios.defaults.headers.common.Authorization,
},
chunkSize,
retryDelays: null,
onError(error) {
reject(error);
},
onBeforeRequest(req) {
const xhr = req.getUnderlyingObject();
const { org } = params;
req.setHeader('X-Organization', org);
xhr.withCredentials = true;
},
onProgress(bytesUploaded) {
if (onUpdate && Number.isInteger(totalSentSize) && Number.isInteger(totalSize)) {
const currentUploadedSize = totalSentSize + bytesUploaded;
const percentage = currentUploadedSize / totalSize;
onUpdate(percentage);
}
},
onSuccess() {
if (totalSentSize) totalSentSize += file.size;
resolve(totalSentSize);
},
});
upload.start();
});
}
function generateError(errorData) {
if (errorData.response) {
const message = `${errorData.message}. ${JSON.stringify(errorData.response.data) || ''}.`;
@ -816,42 +859,6 @@
onUpdate('The data are being uploaded to the server..', null);
async function chunkUpload(taskId, file) {
return new Promise((resolve, reject) => {
const upload = new tus.Upload(file, {
endpoint: `${origin}${backendAPI}/tasks/${taskId}/data/`,
metadata: {
filename: file.name,
filetype: file.type,
},
headers: {
Authorization: `Token ${store.get('token')}`,
},
chunkSize,
retryDelays: null,
onError(error) {
reject(error);
},
onBeforeRequest(req) {
const xhr = req.getUnderlyingObject();
const { org } = params;
req.setHeader('X-Organization', org);
xhr.withCredentials = true;
},
onProgress(bytesUploaded) {
const currentUploadedSize = totalSentSize + bytesUploaded;
const percentage = currentUploadedSize / totalSize;
onUpdate('The data are being uploaded to the server', percentage);
},
onSuccess() {
totalSentSize += file.size;
resolve();
},
});
upload.start();
});
}
async function bulkUpload(taskId, files) {
const fileBulks = files.reduce((fileGroups, file) => {
const lastBulk = fileGroups[fileGroups.length - 1];
@ -891,8 +898,17 @@
proxy: config.proxy,
headers: { 'Upload-Start': true },
});
const uploadConfig = {
endpoint: `${origin}${backendAPI}/tasks/${response.data.id}/data/`,
onUpdate: (percentage) => {
onUpdate('The data are being uploaded to the server', percentage);
},
chunkSize,
totalSize,
totalSentSize,
};
for (const file of chunkFiles) {
await chunkUpload(response.data.id, file);
uploadConfig.totalSentSize += await chunkUpload(file, uploadConfig);
}
if (bulkFiles.length > 0) {
await bulkUpload(response.data.id, bulkFiles);
@ -1215,38 +1231,57 @@
// Session is 'task' or 'job'
async function uploadAnnotations(session, id, file, format) {
const { backendAPI } = config;
const { backendAPI, origin } = config;
const params = {
...enableOrganization(),
format,
filename: file.name,
};
let annotationData = new FormData();
annotationData.append('annotation_file', file);
return new Promise((resolve, reject) => {
async function request() {
try {
const response = await Axios.put(
`${backendAPI}/${session}s/${id}/annotations`,
annotationData,
{
params,
proxy: config.proxy,
},
);
if (response.status === 202) {
annotationData = new FormData();
setTimeout(request, 3000);
} else {
resolve();
const chunkSize = config.uploadChunkSize * 1024 * 1024;
const uploadConfig = {
chunkSize,
endpoint: `${origin}${backendAPI}/${session}s/${id}/annotations/`,
};
try {
await Axios.post(`${backendAPI}/${session}s/${id}/annotations`,
new FormData(), {
params,
proxy: config.proxy,
headers: { 'Upload-Start': true },
});
await chunkUpload(file, uploadConfig);
await Axios.post(`${backendAPI}/${session}s/${id}/annotations`,
new FormData(), {
params,
proxy: config.proxy,
headers: { 'Upload-Finish': true },
});
return new Promise((resolve, reject) => {
async function requestStatus() {
try {
const response = await Axios.put(
`${backendAPI}/${session}s/${id}/annotations`,
new FormData(),
{
params,
proxy: config.proxy,
},
);
if (response.status === 202) {
setTimeout(requestStatus, 3000);
} else {
resolve();
}
} catch (errorData) {
reject(generateError(errorData));
}
} catch (errorData) {
reject(generateError(errorData));
}
}
setTimeout(request);
});
setTimeout(requestStatus);
});
} catch (errorData) {
generateError(errorData);
return null;
}
}
// Session is 'task' or 'job'

@ -9,7 +9,6 @@ import uuid
from django.conf import settings
from django.core.cache import cache
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.response import Response
from cvat.apps.engine.serializers import DataSerializer
@ -26,6 +25,7 @@ class TusFile:
self.offset = cache.get("tus-uploads/{}/offset".format(file_id))
def init_file(self):
os.makedirs(self.upload_dir, exist_ok=True)
file_path = os.path.join(self.upload_dir, self.file_id)
with open(file_path, 'wb') as file:
file.seek(self.file_size - 1)
@ -100,7 +100,7 @@ class UploadMixin(object):
'Access-Control-Allow-Headers': "Tus-Resumable,upload-length,upload-metadata,Location,Upload-Offset,content-type",
'Cache-Control': 'no-store'
}
_file_id_regex = r'(?P<file_id>\b[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\b[0-9a-f]{12}\b)'
file_id_regex = r'(?P<file_id>\b[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\b[0-9a-f]{12}\b)'
def _tus_response(self, status, data=None, extra_headers=None):
response = Response(data, status)
@ -147,9 +147,6 @@ class UploadMixin(object):
if request.method == 'OPTIONS':
return self._tus_response(status=status.HTTP_204)
else:
if not self.can_upload():
return self._tus_response(data='Adding more data is not allowed',
status=status.HTTP_400_BAD_REQUEST)
metadata = self._get_metadata(request)
filename = metadata.get('filename', '')
if not self.validate_filename(filename):
@ -173,13 +170,14 @@ class UploadMixin(object):
tus_file = TusFile.create_file(metadata, file_size, self.get_upload_dir())
location = request.build_absolute_uri()
if 'HTTP_X_FORWARDED_HOST' not in request.META:
location = request.META.get('HTTP_ORIGIN') + request.META.get('PATH_INFO')
return self._tus_response(
status=status.HTTP_201_CREATED,
extra_headers={'Location': '{}{}'.format(request.build_absolute_uri(), tus_file.file_id)})
extra_headers={'Location': '{}{}'.format(location, tus_file.file_id)})
@action(detail=True, methods=['HEAD', 'PATCH'], url_path=r'data/'+_file_id_regex)
def append_tus_chunk(self, request, pk, file_id):
self.get_object() # call check_object_permissions as well
def append_tus_chunk(self, request, file_id):
if request.method == 'HEAD':
tus_file = TusFile.get_tusfile(str(file_id), self.get_upload_dir())
if tus_file:
@ -211,26 +209,16 @@ class UploadMixin(object):
file_path = os.path.join(upload_dir, filename)
return os.path.commonprefix((os.path.realpath(file_path), upload_dir)) == upload_dir
def can_upload(self):
db_model = self.get_object()
model_data = db_model.data
return model_data.size == 0
def get_upload_dir(self):
db_model = self.get_object()
return db_model.data.get_upload_dirname()
return self._object.data.get_upload_dirname()
def get_request_client_files(self, request):
db_model = self.get_object()
serializer = DataSerializer(db_model, data=request.data)
serializer = DataSerializer(self._object, data=request.data)
serializer.is_valid(raise_exception=True)
data = {k: v for k, v in serializer.validated_data.items()}
return data.get('client_files', None);
return data.get('client_files', None)
def append(self, request):
if not self.can_upload():
return Response(data='Adding more data is not allowed',
status=status.HTTP_400_BAD_REQUEST)
client_files = self.get_request_client_files(request)
if client_files:
upload_dir = self.get_upload_dir()

@ -305,6 +305,9 @@ class Task(models.Model):
def get_task_artifacts_dirname(self):
return os.path.join(self.get_task_dirname(), 'artifacts')
def get_tmp_dirname(self):
return os.path.join(self.get_task_dirname(), "tmp")
def __str__(self):
return self.name

@ -636,39 +636,67 @@ class TaskViewSet(UploadMixin, viewsets.ModelViewSet):
return Response(serializer.data)
# UploadMixin method
def get_upload_dir(self):
if 'annotations' in self.action:
return self._object.get_tmp_dirname()
elif 'data' in self.action:
return self._object.data.get_upload_dirname()
return ""
# UploadMixin method
def upload_finished(self, request):
db_task = self.get_object() # call check_object_permissions as well
task_data = db_task.data
serializer = DataSerializer(task_data, data=request.data)
serializer.is_valid(raise_exception=True)
data = dict(serializer.validated_data.items())
uploaded_files = task_data.get_uploaded_files()
uploaded_files.extend(data.get('client_files'))
serializer.validated_data.update({'client_files': uploaded_files})
db_data = serializer.save()
db_task.data = db_data
db_task.save()
data = {k: v for k, v in serializer.data.items()}
data['use_zip_chunks'] = serializer.validated_data['use_zip_chunks']
data['use_cache'] = serializer.validated_data['use_cache']
data['copy_data'] = serializer.validated_data['copy_data']
if data['use_cache']:
db_task.data.storage_method = StorageMethodChoice.CACHE
db_task.data.save(update_fields=['storage_method'])
if data['server_files'] and not data.get('copy_data'):
db_task.data.storage = StorageChoice.SHARE
db_task.data.save(update_fields=['storage'])
if db_data.cloud_storage:
db_task.data.storage = StorageChoice.CLOUD_STORAGE
db_task.data.save(update_fields=['storage'])
# if the value of stop_frame is 0, then inside the function we cannot know
# the value specified by the user or it's default value from the database
if 'stop_frame' not in serializer.validated_data:
data['stop_frame'] = None
task.create(db_task.id, data)
return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
if self.action == 'annotations':
format_name = request.query_params.get("format", "")
filename = request.query_params.get("filename", "")
tmp_dir = self._object.get_tmp_dirname()
if os.path.isfile(os.path.join(tmp_dir, filename)):
annotation_file = os.path.join(tmp_dir, filename)
return _import_annotations(
request=request,
filename=annotation_file,
rq_id="{}@/api/tasks/{}/annotations/upload".format(request.user, self._object.pk),
rq_func=dm.task.import_task_annotations,
pk=self._object.pk,
format_name=format_name,
)
else:
return Response(data='No such file were uploaded',
status=status.HTTP_400_BAD_REQUEST)
elif self.action == 'data':
task_data = self._object.data
serializer = DataSerializer(task_data, data=request.data)
serializer.is_valid(raise_exception=True)
data = dict(serializer.validated_data.items())
uploaded_files = task_data.get_uploaded_files()
uploaded_files.extend(data.get('client_files'))
serializer.validated_data.update({'client_files': uploaded_files})
db_data = serializer.save()
self._object.data = db_data
self._object.save()
data = {k: v for k, v in serializer.data.items()}
data['use_zip_chunks'] = serializer.validated_data['use_zip_chunks']
data['use_cache'] = serializer.validated_data['use_cache']
data['copy_data'] = serializer.validated_data['copy_data']
if data['use_cache']:
self._object.data.storage_method = StorageMethodChoice.CACHE
self._object.data.save(update_fields=['storage_method'])
if data['server_files'] and not data.get('copy_data'):
self._object.data.storage = StorageChoice.SHARE
self._object.data.save(update_fields=['storage'])
if db_data.cloud_storage:
self._object.data.storage = StorageChoice.CLOUD_STORAGE
self._object.data.save(update_fields=['storage'])
# if the value of stop_frame is 0, then inside the function we cannot know
# the value specified by the user or it's default value from the database
if 'stop_frame' not in serializer.validated_data:
data['stop_frame'] = None
task.create(self._object.id, data)
return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
return Response(data='Unknown upload was finished',
status=status.HTTP_400_BAD_REQUEST)
@extend_schema(methods=['POST'],
summary='Method permanently attaches images or video to a task. Supports tus uploads, see more https://tus.io/',
@ -700,14 +728,14 @@ class TaskViewSet(UploadMixin, viewsets.ModelViewSet):
}, tags=['tasks'], versions=['2.0'])
@action(detail=True, methods=['OPTIONS', 'POST', 'GET'], url_path=r'data/?$')
def data(self, request, pk):
db_task = self.get_object() # call check_object_permissions as well
self._object = self.get_object() # call check_object_permissions as well
if request.method == 'POST' or request.method == 'OPTIONS':
task_data = db_task.data
task_data = self._object.data
if not task_data:
task_data = Data.objects.create()
task_data.make_dirs()
db_task.data = task_data
db_task.save()
self._object.data = task_data
self._object.save()
elif task_data.size != 0:
return Response(data='Adding more data is not supported',
status=status.HTTP_400_BAD_REQUEST)
@ -719,10 +747,15 @@ class TaskViewSet(UploadMixin, viewsets.ModelViewSet):
data_quality = request.query_params.get('quality', 'compressed')
data_getter = DataChunkGetter(data_type, data_num, data_quality,
db_task.dimension)
self._object.dimension)
return data_getter(request, db_task.data.start_frame,
db_task.data.stop_frame, db_task.data)
return data_getter(request, self._object.data.start_frame,
self._object.data.stop_frame, self._object.data)
@action(detail=True, methods=['HEAD', 'PATCH'], url_path='data/'+UploadMixin.file_id_regex)
def append_data_chunk(self, request, pk, file_id):
self._object = self.get_object()
return self.append_tus_chunk(request, file_id)
@extend_schema(methods=['GET'], summary='Method allows to download task annotations',
parameters=[
@ -759,14 +792,14 @@ class TaskViewSet(UploadMixin, viewsets.ModelViewSet):
responses={
'204': OpenApiResponse(description='The annotation has been deleted'),
}, tags=['tasks'], versions=['2.0'])
@action(detail=True, methods=['GET', 'DELETE', 'PUT', 'PATCH'],
@action(detail=True, methods=['GET', 'DELETE', 'PUT', 'PATCH', 'POST', 'OPTIONS'], url_path=r'annotations/?$',
serializer_class=LabeledDataSerializer)
def annotations(self, request, pk):
db_task = self.get_object() # force to call check_object_permissions
self._object = self.get_object() # force to call check_object_permissions
if request.method == 'GET':
format_name = request.query_params.get('format')
if format_name:
return _export_annotations(db_instance=db_task,
return _export_annotations(db_instance=self._object,
rq_id="/api/tasks/{}/annotations/{}".format(pk, format_name),
request=request,
action=request.query_params.get("action", "").lower(),
@ -779,6 +812,8 @@ class TaskViewSet(UploadMixin, viewsets.ModelViewSet):
serializer = LabeledDataSerializer(data=data)
if serializer.is_valid(raise_exception=True):
return Response(serializer.data)
elif request.method == 'POST' or request.method == 'OPTIONS':
return self.upload_data(request)
elif request.method == 'PUT':
format_name = request.query_params.get('format')
if format_name:
@ -810,6 +845,11 @@ class TaskViewSet(UploadMixin, viewsets.ModelViewSet):
return Response(data=str(e), status=status.HTTP_400_BAD_REQUEST)
return Response(data)
@action(detail=True, methods=['HEAD', 'PATCH'], url_path='annotations/'+UploadMixin.file_id_regex)
def append_annotations_chunk(self, request, pk, file_id):
self._object = self.get_object()
return self.append_tus_chunk(request, file_id)
@extend_schema(
summary='When task is being created the method returns information about a status of the creation process',
responses={
@ -928,7 +968,7 @@ class TaskViewSet(UploadMixin, viewsets.ModelViewSet):
'200': JobWriteSerializer,
}, tags=['jobs'], versions=['2.0']))
class JobViewSet(viewsets.GenericViewSet, mixins.ListModelMixin,
mixins.RetrieveModelMixin, mixins.UpdateModelMixin):
mixins.RetrieveModelMixin, mixins.UpdateModelMixin, UploadMixin):
queryset = Job.objects.all()
iam_organization_field = 'segment__task__organization'
search_fields = ('task_name', 'project_name', 'assignee', 'state', 'stage')
@ -960,6 +1000,34 @@ class JobViewSet(viewsets.GenericViewSet, mixins.ListModelMixin,
else:
return JobWriteSerializer
# UploadMixin method
def get_upload_dir(self):
task = self._object.segment.task
return task.get_tmp_dirname()
# UploadMixin method
def upload_finished(self, request):
task = self._object.segment.task
if self.action == 'annotations':
format_name = request.query_params.get("format", "")
filename = request.query_params.get("filename", "")
tmp_dir = task.get_tmp_dirname()
if os.path.isfile(os.path.join(tmp_dir, filename)):
annotation_file = os.path.join(tmp_dir, filename)
return _import_annotations(
request=request,
filename=annotation_file,
rq_id="{}@/api/jobs/{}/annotations/upload".format(request.user, self._object.pk),
rq_func=dm.task.import_job_annotations,
pk=self._object.pk,
format_name=format_name,
)
else:
return Response(data='No such file were uploaded',
status=status.HTTP_400_BAD_REQUEST)
return Response(data='Unknown upload was finished',
status=status.HTTP_400_BAD_REQUEST)
@extend_schema(methods=['GET'], summary='Method returns annotations for a specific job',
responses={
'200': LabeledDataSerializer(many=True),
@ -983,13 +1051,15 @@ class JobViewSet(viewsets.GenericViewSet, mixins.ListModelMixin,
responses={
'204': OpenApiResponse(description='The annotation has been deleted'),
}, tags=['jobs'], versions=['2.0'])
@action(detail=True, methods=['GET', 'DELETE', 'PUT', 'PATCH'],
@action(detail=True, methods=['GET', 'DELETE', 'PUT', 'PATCH', 'POST', 'OPTIONS'], url_path=r'annotations/?$',
serializer_class=LabeledDataSerializer)
def annotations(self, request, pk):
self.get_object() # force to call check_object_permissions
self._object = self.get_object() # force to call check_object_permissions
if request.method == 'GET':
data = dm.task.get_job_data(pk)
return Response(data)
elif request.method == 'POST' or request.method == 'OPTIONS':
return self.upload_data(request)
elif request.method == 'PUT':
format_name = request.query_params.get('format', '')
if format_name:
@ -1024,6 +1094,11 @@ class JobViewSet(viewsets.GenericViewSet, mixins.ListModelMixin,
return Response(data=str(e), status=status.HTTP_400_BAD_REQUEST)
return Response(data)
@action(detail=True, methods=['HEAD', 'PATCH'], url_path='annotations/'+UploadMixin.file_id_regex)
def append_annotations_chunk(self, request, pk, file_id):
self._object = self.get_object()
return self.append_tus_chunk(request, file_id)
@extend_schema(
summary='Method returns list of issues for the job',
responses={
@ -1549,7 +1624,7 @@ def rq_handler(job, exc_type, exc_value, tb):
return True
def _import_annotations(request, rq_id, rq_func, pk, format_name):
def _import_annotations(request, rq_id, rq_func, pk, format_name, filename=None):
format_desc = {f.DISPLAY_NAME: f
for f in dm.views.get_import_formats()}.get(format_name)
if format_desc is None:
@ -1562,31 +1637,35 @@ def _import_annotations(request, rq_id, rq_func, pk, format_name):
rq_job = queue.fetch_job(rq_id)
if not rq_job:
serializer = AnnotationFileSerializer(data=request.data)
if serializer.is_valid(raise_exception=True):
anno_file = serializer.validated_data['annotation_file']
fd, filename = mkstemp(prefix='cvat_{}'.format(pk))
with open(filename, 'wb+') as f:
for chunk in anno_file.chunks():
f.write(chunk)
av_scan_paths(filename)
rq_job = queue.enqueue_call(
func=rq_func,
args=(pk, filename, format_name),
job_id=rq_id
)
rq_job.meta['tmp_file'] = filename
rq_job.meta['tmp_file_descriptor'] = fd
rq_job.save_meta()
# If filename is specified we consider that file was uploaded via TUS, so it exists in filesystem
# Then we dont need to create temporary file
fd = None
if not filename:
serializer = AnnotationFileSerializer(data=request.data)
if serializer.is_valid(raise_exception=True):
anno_file = serializer.validated_data['annotation_file']
fd, filename = mkstemp(prefix='cvat_{}'.format(pk))
with open(filename, 'wb+') as f:
for chunk in anno_file.chunks():
f.write(chunk)
av_scan_paths(filename)
rq_job = queue.enqueue_call(
func=rq_func,
args=(pk, filename, format_name),
job_id=rq_id
)
rq_job.meta['tmp_file'] = filename
rq_job.meta['tmp_file_descriptor'] = fd
rq_job.save_meta()
else:
if rq_job.is_finished:
os.close(rq_job.meta['tmp_file_descriptor'])
if rq_job.meta['tmp_file_descriptor']: os.close(rq_job.meta['tmp_file_descriptor'])
os.remove(rq_job.meta['tmp_file'])
rq_job.delete()
return Response(status=status.HTTP_201_CREATED)
elif rq_job.is_failed:
os.close(rq_job.meta['tmp_file_descriptor'])
if rq_job.meta['tmp_file_descriptor']: os.close(rq_job.meta['tmp_file_descriptor'])
os.remove(rq_job.meta['tmp_file'])
exc_info = str(rq_job.exc_info)
rq_job.delete()

@ -487,6 +487,10 @@ TUS_DEFAULT_CHUNK_SIZE = 104857600 # 100 mb
# How django uses X-Forwarded-Proto - https://docs.djangoproject.com/en/2.2/ref/settings/#secure-proxy-ssl-header
SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https')
# Forwarded host - https://docs.djangoproject.com/en/4.0/ref/settings/#std:setting-USE_X_FORWARDED_HOST
# Is used in TUS uploads to provide correct upload endpoint
USE_X_FORWARDED_HOST = True
# Django-sendfile requires to set SENDFILE_ROOT
# https://github.com/moggers87/django-sendfile2
SENDFILE_ROOT = BASE_DIR
@ -528,3 +532,4 @@ SPECTACULAR_SETTINGS = {
# OTHER SETTINGS
# https://drf-spectacular.readthedocs.io/en/latest/settings.html
}

@ -59,7 +59,7 @@ class TestCLI(APITestCase):
def test_tasks_delete(self):
self.cli.tasks_delete([1])
self.cli.tasks_list(False)
self.assertNotRegex(self.mock_stdout.getvalue(), '.*{}.*'.format(self.taskname))
self.assertRegex(self.mock_stdout.getvalue(), '.*Task ID {} deleted.*'.format(1))
def test_tasks_dump(self):
path = os.path.join(settings.SHARE_ROOT, 'test_cli.xml')

Loading…
Cancel
Save