Tus for backups (#9)

main
Kirill Lakhov 4 years ago committed by GitHub
parent 9c4375d42e
commit cb896ecab2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## \[2.2.0] - Unreleased ## \[2.2.0] - Unreleased
### Added ### Added
- Support of attributes returned by serverless functions (<https://github.com/cvat-ai/cvat/pull/4>) based on (<https://github.com/openvinotoolkit/cvat/pull/4506>) - Support of attributes returned by serverless functions (<https://github.com/cvat-ai/cvat/pull/4>) based on (<https://github.com/openvinotoolkit/cvat/pull/4506>)
- Project/task backups uploading via chunk uploads (<https://github.com/cvat-ai/cvat/pull/9>)
### Changed ### Changed
- TDB - TDB

@ -1,12 +1,12 @@
{ {
"name": "cvat-core", "name": "cvat-core",
"version": "5.0.2", "version": "5.0.3",
"lockfileVersion": 2, "lockfileVersion": 2,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "cvat-core", "name": "cvat-core",
"version": "5.0.2", "version": "5.0.3",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"axios": "^0.21.4", "axios": "^0.21.4",

@ -1,6 +1,6 @@
{ {
"name": "cvat-core", "name": "cvat-core",
"version": "5.0.2", "version": "5.0.3",
"description": "Part of Computer Vision Tool which presents an interface for client-side integration", "description": "Part of Computer Vision Tool which presents an interface for client-side integration",
"main": "babel.config.js", "main": "babel.config.js",
"scripts": { "scripts": {

@ -48,15 +48,17 @@
async function chunkUpload(file, uploadConfig) { async function chunkUpload(file, uploadConfig) {
const params = enableOrganization(); const params = enableOrganization();
const { const {
endpoint, chunkSize, totalSize, onUpdate, endpoint, chunkSize, totalSize, onUpdate, metadata,
} = uploadConfig; } = uploadConfig;
let { totalSentSize } = uploadConfig; const { totalSentSize } = uploadConfig;
const uploadResult = { totalSentSize };
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const upload = new tus.Upload(file, { const upload = new tus.Upload(file, {
endpoint, endpoint,
metadata: { metadata: {
filename: file.name, filename: file.name,
filetype: file.type, filetype: file.type,
...metadata,
}, },
headers: { headers: {
Authorization: Axios.defaults.headers.common.Authorization, Authorization: Axios.defaults.headers.common.Authorization,
@ -79,9 +81,13 @@
onUpdate(percentage); onUpdate(percentage);
} }
}, },
onAfterResponse(request, response) {
const uploadFilename = response.getHeader('Upload-Filename');
if (uploadFilename) uploadResult.filename = uploadFilename;
},
onSuccess() { onSuccess() {
if (totalSentSize) totalSentSize += file.size; if (totalSentSize) uploadResult.totalSentSize += file.size;
resolve(totalSentSize); resolve(uploadResult);
}, },
}); });
upload.start(); upload.start();
@ -705,20 +711,39 @@
// keep current default params to 'freeze" them during this request // keep current default params to 'freeze" them during this request
const params = enableOrganization(); const params = enableOrganization();
let taskData = new FormData(); const taskData = new FormData();
taskData.append('task_file', file); const uploadConfig = {
chunkSize: config.uploadChunkSize * 1024 * 1024,
endpoint: `${origin}${backendAPI}/tasks/backup/`,
totalSentSize: 0,
totalSize: file.size,
};
const url = `${backendAPI}/tasks/backup`;
await Axios.post(url,
new FormData(), {
params,
proxy: config.proxy,
headers: { 'Upload-Start': true },
});
const { filename } = await chunkUpload(file, uploadConfig);
let response = await Axios.post(url,
new FormData(), {
params: { ...params, filename },
proxy: config.proxy,
headers: { 'Upload-Finish': true },
});
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
async function request() { async function checkStatus() {
try { try {
const response = await Axios.post(`${backendAPI}/tasks/backup`, taskData, { taskData.set('rq_id', response.data.rq_id);
response = await Axios.post(url, taskData, {
proxy: config.proxy, proxy: config.proxy,
params, params,
}); });
if (response.status === 202) { if (response.status === 202) {
taskData = new FormData(); setTimeout(checkStatus, 3000);
taskData.append('rq_id', response.data.rq_id);
setTimeout(request, 3000);
} else { } else {
// to be able to get the task after it was created, pass frozen params // to be able to get the task after it was created, pass frozen params
const importedTask = await getTasks({ id: response.data.id, ...params }); const importedTask = await getTasks({ id: response.data.id, ...params });
@ -729,7 +754,7 @@
} }
} }
setTimeout(request); setTimeout(checkStatus);
}); });
} }
@ -766,19 +791,38 @@
// keep current default params to 'freeze" them during this request // keep current default params to 'freeze" them during this request
const params = enableOrganization(); const params = enableOrganization();
let data = new FormData(); const projectData = new FormData();
data.append('project_file', file); const uploadConfig = {
chunkSize: config.uploadChunkSize * 1024 * 1024,
endpoint: `${origin}${backendAPI}/projects/backup/`,
totalSentSize: 0,
totalSize: file.size,
};
const url = `${backendAPI}/projects/backup`;
await Axios.post(url,
new FormData(), {
params,
proxy: config.proxy,
headers: { 'Upload-Start': true },
});
const { filename } = await chunkUpload(file, uploadConfig);
let response = await Axios.post(url,
new FormData(), {
params: { ...params, filename },
proxy: config.proxy,
headers: { 'Upload-Finish': true },
});
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
async function request() { async function request() {
try { try {
const response = await Axios.post(`${backendAPI}/projects/backup`, data, { projectData.set('rq_id', response.data.rq_id);
response = await Axios.post(`${backendAPI}/projects/backup`, projectData, {
proxy: config.proxy, proxy: config.proxy,
params, params,
}); });
if (response.status === 202) { if (response.status === 202) {
data = new FormData();
data.append('rq_id', response.data.rq_id);
setTimeout(request, 3000); setTimeout(request, 3000);
} else { } else {
// to be able to get the task after it was created, pass frozen params // to be able to get the task after it was created, pass frozen params

@ -749,19 +749,21 @@ def export(db_instance, request):
result_ttl=ttl, failure_ttl=ttl) result_ttl=ttl, failure_ttl=ttl)
return Response(status=status.HTTP_202_ACCEPTED) return Response(status=status.HTTP_202_ACCEPTED)
def _import(importer, request, rq_id, Serializer, file_field_name): def _import(importer, request, rq_id, Serializer, file_field_name, filename=None):
queue = django_rq.get_queue("default") queue = django_rq.get_queue("default")
rq_job = queue.fetch_job(rq_id) rq_job = queue.fetch_job(rq_id)
if not rq_job: if not rq_job:
serializer = Serializer(data=request.data)
serializer.is_valid(raise_exception=True)
payload_file = serializer.validated_data[file_field_name]
org_id = getattr(request.iam_context['organization'], 'id', None) org_id = getattr(request.iam_context['organization'], 'id', None)
fd, filename = mkstemp(prefix='cvat_') fd = None
with open(filename, 'wb+') as f: if not filename:
for chunk in payload_file.chunks(): serializer = Serializer(data=request.data)
f.write(chunk) serializer.is_valid(raise_exception=True)
payload_file = serializer.validated_data[file_field_name]
fd, filename = mkstemp(prefix='cvat_')
with open(filename, 'wb+') as f:
for chunk in payload_file.chunks():
f.write(chunk)
rq_job = queue.enqueue_call( rq_job = queue.enqueue_call(
func=importer, func=importer,
args=(filename, request.user.id, org_id), args=(filename, request.user.id, org_id),
@ -774,12 +776,12 @@ def _import(importer, request, rq_id, Serializer, file_field_name):
else: else:
if rq_job.is_finished: if rq_job.is_finished:
project_id = rq_job.return_value project_id = rq_job.return_value
os.close(rq_job.meta['tmp_file_descriptor']) if rq_job.meta['tmp_file_descriptor']: os.close(rq_job.meta['tmp_file_descriptor'])
os.remove(rq_job.meta['tmp_file']) os.remove(rq_job.meta['tmp_file'])
rq_job.delete() rq_job.delete()
return Response({'id': project_id}, status=status.HTTP_201_CREATED) return Response({'id': project_id}, status=status.HTTP_201_CREATED)
elif rq_job.is_failed: elif rq_job.is_failed:
os.close(rq_job.meta['tmp_file_descriptor']) if rq_job.meta['tmp_file_descriptor']: os.close(rq_job.meta['tmp_file_descriptor'])
os.remove(rq_job.meta['tmp_file']) os.remove(rq_job.meta['tmp_file'])
exc_info = str(rq_job.exc_info) exc_info = str(rq_job.exc_info)
rq_job.delete() rq_job.delete()
@ -797,7 +799,10 @@ def _import(importer, request, rq_id, Serializer, file_field_name):
return Response({'rq_id': rq_id}, status=status.HTTP_202_ACCEPTED) return Response({'rq_id': rq_id}, status=status.HTTP_202_ACCEPTED)
def import_project(request): def get_backup_dirname():
return settings.TMP_FILES_ROOT
def import_project(request, filename=None):
if 'rq_id' in request.data: if 'rq_id' in request.data:
rq_id = request.data['rq_id'] rq_id = request.data['rq_id']
else: else:
@ -811,9 +816,10 @@ def import_project(request):
rq_id=rq_id, rq_id=rq_id,
Serializer=Serializer, Serializer=Serializer,
file_field_name=file_field_name, file_field_name=file_field_name,
filename=filename
) )
def import_task(request): def import_task(request, filename=None):
if 'rq_id' in request.data: if 'rq_id' in request.data:
rq_id = request.data['rq_id'] rq_id = request.data['rq_id']
else: else:
@ -827,4 +833,5 @@ def import_task(request):
rq_id=rq_id, rq_id=rq_id,
Serializer=Serializer, Serializer=Serializer,
file_field_name=file_field_name, file_field_name=file_field_name,
filename=filename
) )

@ -45,7 +45,12 @@ class TusFile:
file_path = os.path.join(self.upload_dir, self.filename) file_path = os.path.join(self.upload_dir, self.filename)
file_exists = os.path.lexists(os.path.join(self.upload_dir, self.filename)) file_exists = os.path.lexists(os.path.join(self.upload_dir, self.filename))
if file_exists: if file_exists:
raise FileExistsError("File {} is already uploaded".format(self.filename)) original_file_name, extension = os.path.splitext(self.filename)
file_amount = 1
while os.path.lexists(os.path.join(self.upload_dir, self.filename)):
self.filename = "{}_{}{}".format(original_file_name, file_amount, extension)
file_path = os.path.join(self.upload_dir, self.filename)
file_amount += 1
os.rename(file_id_path, file_path) os.rename(file_id_path, file_path)
def clean(self): def clean(self):
@ -66,7 +71,8 @@ class TusFile:
@staticmethod @staticmethod
def create_file(metadata, file_size, upload_dir): def create_file(metadata, file_size, upload_dir):
file_id = str(uuid.uuid4()) file_id = str(uuid.uuid4())
cache.add("tus-uploads/{}/filename".format(file_id), "{}".format(metadata.get("filename")), TusFile._tus_cache_timeout) filename = metadata.get("filename")
cache.add("tus-uploads/{}/filename".format(file_id), "{}".format(filename), TusFile._tus_cache_timeout)
cache.add("tus-uploads/{}/file_size".format(file_id), file_size, TusFile._tus_cache_timeout) cache.add("tus-uploads/{}/file_size".format(file_id), file_size, TusFile._tus_cache_timeout)
cache.add("tus-uploads/{}/offset".format(file_id), 0, TusFile._tus_cache_timeout) cache.add("tus-uploads/{}/offset".format(file_id), 0, TusFile._tus_cache_timeout)
cache.add("tus-uploads/{}/metadata".format(file_id), metadata, TusFile._tus_cache_timeout) cache.add("tus-uploads/{}/metadata".format(file_id), metadata, TusFile._tus_cache_timeout)
@ -175,7 +181,8 @@ class UploadMixin(object):
location = request.META.get('HTTP_ORIGIN') + request.META.get('PATH_INFO') location = request.META.get('HTTP_ORIGIN') + request.META.get('PATH_INFO')
return self._tus_response( return self._tus_response(
status=status.HTTP_201_CREATED, status=status.HTTP_201_CREATED,
extra_headers={'Location': '{}{}'.format(location, tus_file.file_id)}) extra_headers={'Location': '{}{}'.format(location, tus_file.file_id),
'Upload-Filename': tus_file.filename})
def append_tus_chunk(self, request, file_id): def append_tus_chunk(self, request, file_id):
if request.method == 'HEAD': if request.method == 'HEAD':
@ -202,7 +209,8 @@ class UploadMixin(object):
tus_file.clean() tus_file.clean()
return self._tus_response(status=status.HTTP_204_NO_CONTENT, return self._tus_response(status=status.HTTP_204_NO_CONTENT,
extra_headers={'Upload-Offset': tus_file.offset}) extra_headers={'Upload-Offset': tus_file.offset,
'Upload-Filename': tus_file.filename})
def validate_filename(self, filename): def validate_filename(self, filename):
upload_dir = self.get_upload_dir() upload_dir = self.get_upload_dir()

@ -374,9 +374,16 @@ class ProjectViewSet(viewsets.ModelViewSet, UploadMixin):
filename=request.query_params.get("filename", "").lower(), filename=request.query_params.get("filename", "").lower(),
) )
@action(detail=True, methods=['HEAD', 'PATCH'], url_path='dataset/'+UploadMixin.file_id_regex)
def append_dataset_chunk(self, request, pk, file_id):
self._object = self.get_object()
return self.append_tus_chunk(request, file_id)
def get_upload_dir(self): def get_upload_dir(self):
if 'dataset' in self.action: if 'dataset' in self.action:
return self._object.get_tmp_dirname() return self._object.get_tmp_dirname()
elif 'backup' in self.action:
return backup.get_backup_dirname()
return "" return ""
def upload_finished(self, request): def upload_finished(self, request):
@ -395,14 +402,19 @@ class ProjectViewSet(viewsets.ModelViewSet, UploadMixin):
pk=self._object.pk, pk=self._object.pk,
format_name=format_name, format_name=format_name,
) )
elif self.action == 'import_backup':
filename = request.query_params.get("filename", "")
if filename:
tmp_dir = backup.get_backup_dirname()
backup_file = os.path.join(tmp_dir, filename)
if os.path.isfile(backup_file):
return backup.import_project(request, filename=backup_file)
return Response(data='No such file were uploaded',
status=status.HTTP_400_BAD_REQUEST)
return backup.import_project(request)
return Response(data='Unknown upload was finished', return Response(data='Unknown upload was finished',
status=status.HTTP_400_BAD_REQUEST) status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=['HEAD', 'PATCH'], url_path='dataset/'+UploadMixin.file_id_regex)
def append_dataset_chunk(self, request, pk, file_id):
self._object = self.get_object()
return self.append_tus_chunk(request, file_id)
@extend_schema(summary='Method allows to download project annotations', @extend_schema(summary='Method allows to download project annotations',
parameters=[ parameters=[
OpenApiParameter('format', description='Desired output format name\n' OpenApiParameter('format', description='Desired output format name\n'
@ -453,9 +465,13 @@ class ProjectViewSet(viewsets.ModelViewSet, UploadMixin):
'201': OpenApiResponse(description='The project has been imported'), # or better specify {id: project_id} '201': OpenApiResponse(description='The project has been imported'), # or better specify {id: project_id}
'202': OpenApiResponse(description='Importing a backup file has been started'), '202': OpenApiResponse(description='Importing a backup file has been started'),
}) })
@action(detail=False, methods=['POST'], url_path='backup') @action(detail=False, methods=['OPTIONS', 'POST'], url_path=r'backup/?$')
def import_backup(self, request, pk=None): def import_backup(self, request, pk=None):
return backup.import_project(request) return self.upload_data(request)
@action(detail=False, methods=['HEAD', 'PATCH'], url_path='backup/'+UploadMixin.file_id_regex)
def append_backup_chunk(self, request, file_id):
return self.append_tus_chunk(request, file_id)
@staticmethod @staticmethod
def _get_rq_response(queue, job_id): def _get_rq_response(queue, job_id):
@ -607,9 +623,13 @@ class TaskViewSet(UploadMixin, viewsets.ModelViewSet):
'201': OpenApiResponse(description='The task has been imported'), # or better specify {id: task_id} '201': OpenApiResponse(description='The task has been imported'), # or better specify {id: task_id}
'202': OpenApiResponse(description='Importing a backup file has been started'), '202': OpenApiResponse(description='Importing a backup file has been started'),
}) })
@action(detail=False, methods=['POST'], url_path='backup') @action(detail=False, methods=['OPTIONS', 'POST'], url_path=r'backup/?$')
def import_backup(self, request, pk=None): def import_backup(self, request, pk=None):
return backup.import_task(request) return self.upload_data(request)
@action(detail=False, methods=['HEAD', 'PATCH'], url_path='backup/'+UploadMixin.file_id_regex)
def append_backup_chunk(self, request, file_id):
return self.append_tus_chunk(request, file_id)
@extend_schema(summary='Method backup a specified task', @extend_schema(summary='Method backup a specified task',
responses={ responses={
@ -668,6 +688,8 @@ class TaskViewSet(UploadMixin, viewsets.ModelViewSet):
return self._object.get_tmp_dirname() return self._object.get_tmp_dirname()
elif 'data' in self.action: elif 'data' in self.action:
return self._object.data.get_upload_dirname() return self._object.data.get_upload_dirname()
elif 'backup' in self.action:
return backup.get_backup_dirname()
return "" return ""
# UploadMixin method # UploadMixin method
@ -721,6 +743,16 @@ class TaskViewSet(UploadMixin, viewsets.ModelViewSet):
data['stop_frame'] = None data['stop_frame'] = None
task.create(self._object.id, data) task.create(self._object.id, data)
return Response(serializer.data, status=status.HTTP_202_ACCEPTED) return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
elif self.action == 'import_backup':
filename = request.query_params.get("filename", "")
if filename:
tmp_dir = backup.get_backup_dirname()
backup_file = os.path.join(tmp_dir, filename)
if os.path.isfile(backup_file):
return backup.import_task(request, filename=backup_file)
return Response(data='No such file were uploaded',
status=status.HTTP_400_BAD_REQUEST)
return backup.import_task(request)
return Response(data='Unknown upload was finished', return Response(data='Unknown upload was finished',
status=status.HTTP_400_BAD_REQUEST) status=status.HTTP_400_BAD_REQUEST)

@ -498,6 +498,8 @@ class ProjectPermission(OpenPolicyAgentPermission):
('dataset', 'GET'): 'export:dataset', ('dataset', 'GET'): 'export:dataset',
('export_backup', 'GET'): 'export:backup', ('export_backup', 'GET'): 'export:backup',
('import_backup', 'POST'): 'import:backup', ('import_backup', 'POST'): 'import:backup',
('append_backup_chunk', 'PATCH'): 'import:backup',
('append_backup_chunk', 'HEAD'): 'import:backup',
}.get((view.action, request.method)) }.get((view.action, request.method))
scopes = [] scopes = []
@ -656,6 +658,8 @@ class TaskPermission(OpenPolicyAgentPermission):
('append_data_chunk', 'HEAD'): 'upload:data', ('append_data_chunk', 'HEAD'): 'upload:data',
('jobs', 'GET'): 'view', ('jobs', 'GET'): 'view',
('import_backup', 'POST'): 'import:backup', ('import_backup', 'POST'): 'import:backup',
('append_backup_chunk', 'PATCH'): 'import:backup',
('append_backup_chunk', 'HEAD'): 'import:backup',
('export_backup', 'GET'): 'export:backup', ('export_backup', 'GET'): 'export:backup',
}.get((view.action, request.method)) }.get((view.action, request.method))

@ -372,6 +372,9 @@ os.makedirs(MIGRATIONS_LOGS_ROOT, exist_ok=True)
CLOUD_STORAGE_ROOT = os.path.join(DATA_ROOT, 'storages') CLOUD_STORAGE_ROOT = os.path.join(DATA_ROOT, 'storages')
os.makedirs(CLOUD_STORAGE_ROOT, exist_ok=True) os.makedirs(CLOUD_STORAGE_ROOT, exist_ok=True)
TMP_FILES_ROOT = os.path.join(DATA_ROOT, 'tmp')
os.makedirs(TMP_FILES_ROOT, exist_ok=True)
LOGGING = { LOGGING = {
'version': 1, 'version': 1,
'disable_existing_loggers': False, 'disable_existing_loggers': False,

@ -93,9 +93,13 @@ context('Export, import an annotation task.', { browser: '!firefox' }, () => {
}); });
it('Import the task. Check id, labels, shape.', () => { it('Import the task. Check id, labels, shape.', () => {
cy.intercept('POST', '/api/tasks/backup?**').as('importTask'); cy.intercept({ method: /PATCH|POST/, url: /\/api\/tasks\/backup.*/ }).as('importTask');
cy.get('.cvat-create-task-dropdown').click(); cy.get('.cvat-create-task-dropdown').click();
cy.get('.cvat-import-task').click().find('input[type=file]').attachFile(taskBackupArchiveFullName); cy.get('.cvat-import-task').click().find('input[type=file]').attachFile(taskBackupArchiveFullName);
cy.wait('@importTask').its('response.statusCode').should('equal', 202);
cy.wait('@importTask').its('response.statusCode').should('equal', 201);
cy.wait('@importTask').its('response.statusCode').should('equal', 204);
cy.wait('@importTask').its('response.statusCode').should('equal', 202);
cy.wait('@importTask', { timeout: 5000 }).its('response.statusCode').should('equal', 202); cy.wait('@importTask', { timeout: 5000 }).its('response.statusCode').should('equal', 202);
cy.wait('@importTask').its('response.statusCode').should('equal', 201); cy.wait('@importTask').its('response.statusCode').should('equal', 201);
cy.contains('Task has been imported succesfully').should('exist').and('be.visible'); cy.contains('Task has been imported succesfully').should('exist').and('be.visible');

@ -140,9 +140,13 @@ Cypress.Commands.add('backupProject', (projectName) => {
}); });
Cypress.Commands.add('restoreProject', (archiveWithBackup) => { Cypress.Commands.add('restoreProject', (archiveWithBackup) => {
cy.intercept('POST', '/api/projects/backup?**').as('restoreProject'); cy.intercept({ method: /PATCH|POST/, url: /\/api\/projects\/backup.*/ }).as('restoreProject');
cy.get('.cvat-create-project-dropdown').click(); cy.get('.cvat-create-project-dropdown').click();
cy.get('.cvat-import-project').click().find('input[type=file]').attachFile(archiveWithBackup); cy.get('.cvat-import-project').click().find('input[type=file]').attachFile(archiveWithBackup);
cy.wait('@restoreProject').its('response.statusCode').should('equal', 202);
cy.wait('@restoreProject').its('response.statusCode').should('equal', 201);
cy.wait('@restoreProject').its('response.statusCode').should('equal', 204);
cy.wait('@restoreProject').its('response.statusCode').should('equal', 202);
cy.wait('@restoreProject', { timeout: 5000 }).its('response.statusCode').should('equal', 202); cy.wait('@restoreProject', { timeout: 5000 }).its('response.statusCode').should('equal', 202);
cy.wait('@restoreProject').its('response.statusCode').should('equal', 201); cy.wait('@restoreProject').its('response.statusCode').should('equal', 201);
cy.contains('Project has been created succesfully') cy.contains('Project has been created succesfully')

Loading…
Cancel
Save