Source & target storage tests (#56)
parent
edaa57fd7b
commit
57bc0e9c90
Binary file not shown.
@ -1,4 +1,5 @@
|
||||
pytest==6.2.5
|
||||
requests==2.26.0
|
||||
deepdiff==5.6.0
|
||||
boto3==1.17.61
|
||||
Pillow==9.0.1
|
||||
|
||||
@ -0,0 +1,244 @@
|
||||
import pytest
|
||||
import boto3
|
||||
import functools
|
||||
import json
|
||||
|
||||
from botocore.exceptions import ClientError
|
||||
from http import HTTPStatus
|
||||
|
||||
from .utils.config import (
|
||||
get_method, post_method, MINIO_KEY, MINIO_SECRET_KEY, MINIO_ENDPOINT_URL,
|
||||
)
|
||||
|
||||
FILENAME_TEMPLATE = 'cvat/{}/{}.zip'
|
||||
FORMAT = 'COCO 1.0'
|
||||
|
||||
def _use_custom_settings(obj, resource, cloud_storage_id):
|
||||
return {
|
||||
'filename': FILENAME_TEMPLATE.format(obj, resource),
|
||||
'use_default_location': False,
|
||||
'location': 'cloud_storage',
|
||||
'cloud_storage_id': cloud_storage_id,
|
||||
'format': FORMAT,
|
||||
}
|
||||
|
||||
def _use_default_settings(obj, resource):
|
||||
return {
|
||||
'filename': FILENAME_TEMPLATE.format(obj, resource),
|
||||
'use_default_location': True,
|
||||
'format': FORMAT,
|
||||
}
|
||||
|
||||
def define_client():
|
||||
s3 = boto3.resource(
|
||||
's3',
|
||||
aws_access_key_id=MINIO_KEY,
|
||||
aws_secret_access_key=MINIO_SECRET_KEY,
|
||||
endpoint_url= MINIO_ENDPOINT_URL,
|
||||
)
|
||||
return s3.meta.client
|
||||
|
||||
def assert_file_does_not_exist(client, bucket, filename):
|
||||
try:
|
||||
client.head_object(Bucket=bucket, Key=filename)
|
||||
raise AssertionError(f'File {filename} on bucket {bucket} already exists')
|
||||
except ClientError:
|
||||
pass
|
||||
|
||||
def assert_file_exists(client, bucket, filename):
|
||||
try:
|
||||
client.head_object(Bucket=bucket, Key=filename)
|
||||
except ClientError:
|
||||
raise AssertionError(f"File {filename} on bucket {bucket} doesn't exist")
|
||||
|
||||
def assert_file_status(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper(user, storage_conf, *args, **kwargs):
|
||||
filename = kwargs['filename']
|
||||
bucket = storage_conf['resource']
|
||||
# get storage client
|
||||
client = define_client()
|
||||
# check that file doesn't exist on the bucket
|
||||
assert_file_does_not_exist(client, bucket, filename)
|
||||
func(user, storage_conf, *args, **kwargs)
|
||||
# check that file exists on the bucket
|
||||
assert_file_exists(client, bucket, filename)
|
||||
return wrapper
|
||||
|
||||
def remove_asset(bucket, filename):
|
||||
client = define_client()
|
||||
client.delete_object(Bucket=bucket, Key=filename)
|
||||
|
||||
@assert_file_status
|
||||
def _save_resource_to_cloud_storage(user, storage_conf, obj_id, obj, resource, **kwargs):
|
||||
response = get_method(user, f'{obj}/{obj_id}/{resource}', **kwargs)
|
||||
status = response.status_code
|
||||
|
||||
while status != HTTPStatus.OK:
|
||||
assert status in (HTTPStatus.CREATED, HTTPStatus.ACCEPTED)
|
||||
response = get_method(user, f'{obj}/{obj_id}/{resource}', action='download', **kwargs)
|
||||
status = response.status_code
|
||||
|
||||
def _idempotent_saving_resource_to_cloud_storage(*args, **kwargs):
|
||||
_save_resource_to_cloud_storage(*args, **kwargs)
|
||||
remove_asset(args[1]['resource'], kwargs['filename'])
|
||||
|
||||
@pytest.mark.usefixtures('dontchangedb')
|
||||
class TestSaveResource:
|
||||
_USERNAME = 'admin1'
|
||||
_ORG = 2
|
||||
|
||||
@pytest.mark.parametrize('cloud_storage_id', [3])
|
||||
@pytest.mark.parametrize('obj_id, obj, resource', [
|
||||
(2, 'projects', 'annotations'),
|
||||
(2, 'projects', 'dataset'),
|
||||
(2, 'projects', 'backup'),
|
||||
(11, 'tasks', 'annotations'),
|
||||
(11, 'tasks', 'dataset'),
|
||||
(11, 'tasks', 'backup'),
|
||||
(16, 'jobs', 'annotations'),
|
||||
(16, 'jobs', 'dataset'),
|
||||
])
|
||||
def test_save_resource_to_cloud_storage_with_specific_location(
|
||||
self, cloud_storage_id, obj_id, obj, resource, cloud_storages
|
||||
):
|
||||
cloud_storage = cloud_storages[cloud_storage_id]
|
||||
kwargs = _use_custom_settings(obj, resource, cloud_storage_id)
|
||||
if resource == 'backup':
|
||||
kwargs.pop('format')
|
||||
|
||||
_idempotent_saving_resource_to_cloud_storage(self._USERNAME, cloud_storage,
|
||||
obj_id, obj, resource, org_id=self._ORG, **kwargs)
|
||||
|
||||
@pytest.mark.parametrize('obj_id, obj, resource', [
|
||||
(2, 'projects', 'annotations'),
|
||||
(2, 'projects', 'dataset'),
|
||||
(2, 'projects', 'backup'),
|
||||
(11, 'tasks', 'annotations'),
|
||||
(11, 'tasks', 'dataset'),
|
||||
(11, 'tasks', 'backup'),
|
||||
(16, 'jobs', 'annotations'),
|
||||
(16, 'jobs', 'dataset'),
|
||||
])
|
||||
def test_save_resource_to_cloud_storage_with_default_location(
|
||||
self, obj_id, obj, resource, projects, tasks, jobs, cloud_storages,
|
||||
):
|
||||
objects = {
|
||||
'projects': projects,
|
||||
'tasks': tasks,
|
||||
'jobs': jobs,
|
||||
}
|
||||
if obj in ('projects', 'tasks'):
|
||||
cloud_storage_id = objects[obj][obj_id]['target_storage']['cloud_storage_id']
|
||||
else:
|
||||
task_id = jobs[obj_id]['task_id']
|
||||
cloud_storage_id = tasks[task_id]['target_storage']['cloud_storage_id']
|
||||
cloud_storage = cloud_storages[cloud_storage_id]
|
||||
|
||||
kwargs = _use_default_settings(obj, resource)
|
||||
|
||||
if resource == 'backup':
|
||||
kwargs.pop('format')
|
||||
|
||||
_idempotent_saving_resource_to_cloud_storage(self._USERNAME, cloud_storage,
|
||||
obj_id, obj, resource, org_id=self._ORG, **kwargs)
|
||||
|
||||
def _import_annotations_from_cloud_storage(user, obj_id, obj, **kwargs):
|
||||
url = f'{obj}/{obj_id}/annotations'
|
||||
response = post_method(user, url, data=None, **kwargs)
|
||||
status = response.status_code
|
||||
|
||||
while status != HTTPStatus.CREATED:
|
||||
assert status == HTTPStatus.ACCEPTED
|
||||
response = post_method(user, url, data=None, **kwargs)
|
||||
status = response.status_code
|
||||
|
||||
def _import_backup_from_cloud_storage(user, obj_id, obj, **kwargs):
|
||||
url = f'{obj}/backup'
|
||||
response = post_method(user, url, data=None, **kwargs)
|
||||
status = response.status_code
|
||||
|
||||
while status != HTTPStatus.CREATED:
|
||||
assert status == HTTPStatus.ACCEPTED
|
||||
data = json.loads(response.content.decode('utf8'))
|
||||
response = post_method(user, url, data=data, **kwargs)
|
||||
status = response.status_code
|
||||
|
||||
def _import_dataset_from_cloud_storage(user, obj_id, obj, **kwargs):
|
||||
url = f'{obj}/{obj_id}/dataset'
|
||||
response = post_method(user, url, data=None, **kwargs)
|
||||
status = response.status_code
|
||||
|
||||
while status != HTTPStatus.CREATED:
|
||||
assert status == HTTPStatus.ACCEPTED
|
||||
response = get_method(user, url, action='import_status')
|
||||
status = response.status_code
|
||||
|
||||
@pytest.mark.usefixtures('changedb')
|
||||
@pytest.mark.usefixtures('restore_cvat_data')
|
||||
class TestImportResource:
|
||||
_USERNAME = 'admin1'
|
||||
_ORG = 2
|
||||
|
||||
@pytest.mark.parametrize('cloud_storage_id', [3])
|
||||
@pytest.mark.parametrize('obj_id, obj, resource', [
|
||||
(2, 'projects', 'dataset'),
|
||||
(2, 'projects', 'backup'),
|
||||
(11, 'tasks', 'annotations'),
|
||||
(11, 'tasks', 'backup'),
|
||||
(16, 'jobs', 'annotations'),
|
||||
])
|
||||
def test_import_resource_from_cloud_storage_with_specific_location(
|
||||
self, cloud_storage_id, obj_id, obj, resource, cloud_storages
|
||||
):
|
||||
cloud_storage = cloud_storages[cloud_storage_id]
|
||||
kwargs = _use_custom_settings(obj, resource, cloud_storage_id)
|
||||
export_kwargs = _use_custom_settings(obj, resource, cloud_storage_id)
|
||||
|
||||
if resource == 'backup':
|
||||
kwargs.pop('format')
|
||||
kwargs.pop('use_default_location')
|
||||
export_kwargs.pop('format')
|
||||
|
||||
# export current resource to cloud storage
|
||||
_save_resource_to_cloud_storage(self._USERNAME, cloud_storage, obj_id, obj, resource, org_id=self._ORG, **export_kwargs)
|
||||
|
||||
import_resource = {
|
||||
'annotations': _import_annotations_from_cloud_storage,
|
||||
'dataset': _import_dataset_from_cloud_storage,
|
||||
'backup': _import_backup_from_cloud_storage,
|
||||
}
|
||||
import_resource[resource](self._USERNAME, obj_id, obj, org_id=self._ORG, **kwargs)
|
||||
remove_asset(cloud_storage['resource'], kwargs['filename'])
|
||||
|
||||
@pytest.mark.parametrize('obj_id, obj, resource', [
|
||||
(2, 'projects', 'dataset'),
|
||||
(11, 'tasks', 'annotations'),
|
||||
(16, 'jobs', 'annotations'),
|
||||
])
|
||||
def test_import_resource_from_cloud_storage_with_default_location(
|
||||
self, obj_id, obj, resource, projects, tasks, jobs, cloud_storages,
|
||||
):
|
||||
objects = {
|
||||
'projects': projects,
|
||||
'tasks': tasks,
|
||||
'jobs': jobs,
|
||||
}
|
||||
if obj in ('projects', 'tasks'):
|
||||
cloud_storage_id = objects[obj][obj_id]['source_storage']['cloud_storage_id']
|
||||
else:
|
||||
task_id = jobs[obj_id]['task_id']
|
||||
cloud_storage_id = tasks[task_id]['source_storage']['cloud_storage_id']
|
||||
cloud_storage = cloud_storages[cloud_storage_id]
|
||||
kwargs = _use_default_settings(obj, resource)
|
||||
|
||||
# export current resource to cloud storage
|
||||
_save_resource_to_cloud_storage(self._USERNAME, cloud_storage, obj_id, obj, resource, org_id=self._ORG, **kwargs)
|
||||
|
||||
import_resource = {
|
||||
'annotations': _import_annotations_from_cloud_storage,
|
||||
'dataset': _import_dataset_from_cloud_storage,
|
||||
'backup': _import_backup_from_cloud_storage,
|
||||
}
|
||||
import_resource[resource](self._USERNAME, obj_id, obj, org_id=self._ORG, **kwargs)
|
||||
remove_asset(cloud_storage['resource'], kwargs['filename'])
|
||||
Loading…
Reference in New Issue