Refactor resource import export tests (#5429)

Extracted some enhancements from
https://github.com/opencv/cvat/pull/4819

- Extracted common s3 manipulations in tests
- Refactored import/export tests to be more clear
main
Maxim Zhiltsov 3 years ago committed by GitHub
parent 0a032b3236
commit 82adde42aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -5,109 +5,131 @@
import functools
import json
from contextlib import ExitStack
from http import HTTPStatus
from typing import Any, Dict, TypeVar
import boto3
import pytest
from botocore.exceptions import ClientError
from shared.utils.config import (
MINIO_ENDPOINT_URL,
MINIO_KEY,
MINIO_SECRET_KEY,
get_method,
post_method,
)
from shared.utils.config import get_method, post_method
from shared.utils.s3 import make_client
T = TypeVar("T")
FILENAME_TEMPLATE = "cvat/{}/{}.zip"
FORMAT = "COCO 1.0"
def _use_custom_settings(obj, resource, cloud_storage_id):
return {
def _make_custom_resource_params(obj: str, resource: str, cloud_storage_id: int) -> Dict[str, Any]:
params = {
"filename": FILENAME_TEMPLATE.format(obj, resource),
"use_default_location": False,
"location": "cloud_storage",
"cloud_storage_id": cloud_storage_id,
"format": FORMAT,
"use_default_location": False,
}
if resource != "backup":
params["format"] = FORMAT
return params
def _use_default_settings(obj, resource):
return {
def _make_default_resource_params(obj: str, resource: str) -> Dict[str, Any]:
params = {
"filename": FILENAME_TEMPLATE.format(obj, resource),
"use_default_location": True,
"format": FORMAT,
}
if resource != "backup":
params["format"] = FORMAT
return params
def define_client():
s3 = boto3.resource(
"s3",
aws_access_key_id=MINIO_KEY,
aws_secret_access_key=MINIO_SECRET_KEY,
endpoint_url=MINIO_ENDPOINT_URL,
)
return s3.meta.client
class _S3ResourceTest:
@pytest.fixture(autouse=True)
def setup(self, admin_user: str):
self.user = admin_user
self.s3_client = make_client()
self.exit_stack = ExitStack()
with self.exit_stack:
yield
def _ensure_file_created(self, func: T, storage: Dict[str, Any]) -> T:
@functools.wraps(func)
def wrapper(*args, **kwargs):
filename = kwargs["filename"]
bucket = storage["resource"]
def assert_file_does_not_exist(client, bucket, filename):
try:
client.head_object(Bucket=bucket, Key=filename)
raise AssertionError(f"File {filename} on bucket {bucket} already exists")
except ClientError:
pass
# check that file doesn't exist on the bucket
assert not self.s3_client.file_exists(bucket, filename)
func(*args, **kwargs)
def assert_file_exists(client, bucket, filename):
try:
client.head_object(Bucket=bucket, Key=filename)
except ClientError:
raise AssertionError(f"File {filename} on bucket {bucket} doesn't exist")
# check that file exists on the bucket
assert self.s3_client.file_exists(bucket, filename)
return wrapper
def assert_file_status(func):
@functools.wraps(func)
def wrapper(user, storage_conf, *args, **kwargs):
filename = kwargs["filename"]
bucket = storage_conf["resource"]
# get storage client
client = define_client()
# check that file doesn't exist on the bucket
assert_file_does_not_exist(client, bucket, filename)
func(user, storage_conf, *args, **kwargs)
# check that file exists on the bucket
assert_file_exists(client, bucket, filename)
def _export_resource_to_cloud_storage(
self, obj_id: int, obj: str, resource: str, *, user: str, **kwargs
):
response = get_method(user, f"{obj}/{obj_id}/{resource}", **kwargs)
status = response.status_code
return wrapper
while status != HTTPStatus.OK:
assert status in (HTTPStatus.CREATED, HTTPStatus.ACCEPTED)
response = get_method(user, f"{obj}/{obj_id}/{resource}", action="download", **kwargs)
status = response.status_code
def _import_annotations_from_cloud_storage(self, obj_id, obj, *, user, **kwargs):
url = f"{obj}/{obj_id}/annotations"
response = post_method(user, url, data=None, **kwargs)
status = response.status_code
def remove_asset(bucket, filename):
client = define_client()
client.delete_object(Bucket=bucket, Key=filename)
while status != HTTPStatus.CREATED:
assert status == HTTPStatus.ACCEPTED
response = post_method(user, url, data=None, **kwargs)
status = response.status_code
def _import_backup_from_cloud_storage(self, obj_id, obj, *, user, **kwargs):
url = f"{obj}/backup"
response = post_method(user, url, data=None, **kwargs)
status = response.status_code
@assert_file_status
def _save_resource_to_cloud_storage(user, storage_conf, obj_id, obj, resource, **kwargs):
response = get_method(user, f"{obj}/{obj_id}/{resource}", **kwargs)
status = response.status_code
while status != HTTPStatus.CREATED:
assert status == HTTPStatus.ACCEPTED
data = json.loads(response.content.decode("utf8"))
response = post_method(user, url, data=data, **kwargs)
status = response.status_code
while status != HTTPStatus.OK:
assert status in (HTTPStatus.CREATED, HTTPStatus.ACCEPTED)
response = get_method(user, f"{obj}/{obj_id}/{resource}", action="download", **kwargs)
def _import_dataset_from_cloud_storage(self, obj_id, obj, *, user, **kwargs):
url = f"{obj}/{obj_id}/dataset"
response = post_method(user, url, data=None, **kwargs)
status = response.status_code
while status != HTTPStatus.CREATED:
assert status == HTTPStatus.ACCEPTED
response = get_method(user, url, action="import_status")
status = response.status_code
def _idempotent_saving_resource_to_cloud_storage(*args, **kwargs):
_save_resource_to_cloud_storage(*args, **kwargs)
remove_asset(args[1]["resource"], kwargs["filename"])
def _export_resource(self, cloud_storage: Dict[str, Any], *args, **kwargs):
org_id = cloud_storage["organization"]
if org_id:
kwargs.setdefault("org_id", org_id)
kwargs.setdefault("user", self.user)
export_callback = self._ensure_file_created(
self._export_resource_to_cloud_storage, storage=cloud_storage
)
export_callback(*args, **kwargs)
self.exit_stack.callback(
self.s3_client.remove_file,
bucket=cloud_storage["resource"],
filename=kwargs["filename"],
)
@pytest.mark.usefixtures("restore_db_per_class")
class TestSaveResource:
_USERNAME = "admin1"
_ORG = 2
@pytest.mark.usefixtures("restore_db_per_class")
class TestExportResource(_S3ResourceTest):
@pytest.mark.parametrize("cloud_storage_id", [3])
@pytest.mark.parametrize(
"obj_id, obj, resource",
@ -126,13 +148,9 @@ class TestSaveResource:
self, cloud_storage_id, obj_id, obj, resource, cloud_storages
):
cloud_storage = cloud_storages[cloud_storage_id]
kwargs = _use_custom_settings(obj, resource, cloud_storage_id)
if resource == "backup":
kwargs.pop("format")
kwargs = _make_custom_resource_params(obj, resource, cloud_storage_id)
_idempotent_saving_resource_to_cloud_storage(
self._USERNAME, cloud_storage, obj_id, obj, resource, org_id=self._ORG, **kwargs
)
self._export_resource(cloud_storage, obj_id, obj, resource, **kwargs)
@pytest.mark.parametrize(
"obj_id, obj, resource",
@ -168,56 +186,28 @@ class TestSaveResource:
task_id = jobs[obj_id]["task_id"]
cloud_storage_id = tasks[task_id]["target_storage"]["cloud_storage_id"]
cloud_storage = cloud_storages[cloud_storage_id]
kwargs = _make_default_resource_params(obj, resource)
kwargs = _use_default_settings(obj, resource)
if resource == "backup":
kwargs.pop("format")
_idempotent_saving_resource_to_cloud_storage(
self._USERNAME, cloud_storage, obj_id, obj, resource, org_id=self._ORG, **kwargs
)
def _import_annotations_from_cloud_storage(user, obj_id, obj, **kwargs):
url = f"{obj}/{obj_id}/annotations"
response = post_method(user, url, data=None, **kwargs)
status = response.status_code
while status != HTTPStatus.CREATED:
assert status == HTTPStatus.ACCEPTED
response = post_method(user, url, data=None, **kwargs)
status = response.status_code
self._export_resource(cloud_storage, obj_id, obj, resource, **kwargs)
def _import_backup_from_cloud_storage(user, obj_id, obj, **kwargs):
url = f"{obj}/backup"
response = post_method(user, url, data=None, **kwargs)
status = response.status_code
while status != HTTPStatus.CREATED:
assert status == HTTPStatus.ACCEPTED
data = json.loads(response.content.decode("utf8"))
response = post_method(user, url, data=data, **kwargs)
status = response.status_code
def _import_dataset_from_cloud_storage(user, obj_id, obj, **kwargs):
url = f"{obj}/{obj_id}/dataset"
response = post_method(user, url, data=None, **kwargs)
status = response.status_code
@pytest.mark.usefixtures("restore_db_per_function")
@pytest.mark.usefixtures("restore_cvat_data")
class TestImportResource(_S3ResourceTest):
def _import_resource(self, cloud_storage: Dict[str, Any], resource_type: str, *args, **kwargs):
methods = {
"annotations": self._import_annotations_from_cloud_storage,
"dataset": self._import_dataset_from_cloud_storage,
"backup": self._import_backup_from_cloud_storage,
}
while status != HTTPStatus.CREATED:
assert status == HTTPStatus.ACCEPTED
response = get_method(user, url, action="import_status")
status = response.status_code
org_id = cloud_storage["organization"]
if org_id:
kwargs.setdefault("org_id", org_id)
kwargs.setdefault("user", self.user)
@pytest.mark.usefixtures("restore_db_per_function")
@pytest.mark.usefixtures("restore_cvat_data")
class TestImportResource:
_USERNAME = "admin1"
_ORG = 2
return methods[resource_type](*args, **kwargs)
@pytest.mark.parametrize("cloud_storage_id", [3])
@pytest.mark.parametrize(
@ -234,26 +224,11 @@ class TestImportResource:
self, cloud_storage_id, obj_id, obj, resource, cloud_storages
):
cloud_storage = cloud_storages[cloud_storage_id]
kwargs = _use_custom_settings(obj, resource, cloud_storage_id)
export_kwargs = _use_custom_settings(obj, resource, cloud_storage_id)
if resource == "backup":
kwargs.pop("format")
kwargs.pop("use_default_location")
export_kwargs.pop("format")
kwargs = _make_custom_resource_params(obj, resource, cloud_storage_id)
export_kwargs = _make_custom_resource_params(obj, resource, cloud_storage_id)
self._export_resource(cloud_storage, obj_id, obj, resource, **export_kwargs)
# export current resource to cloud storage
_save_resource_to_cloud_storage(
self._USERNAME, cloud_storage, obj_id, obj, resource, org_id=self._ORG, **export_kwargs
)
import_resource = {
"annotations": _import_annotations_from_cloud_storage,
"dataset": _import_dataset_from_cloud_storage,
"backup": _import_backup_from_cloud_storage,
}
import_resource[resource](self._USERNAME, obj_id, obj, org_id=self._ORG, **kwargs)
remove_asset(cloud_storage["resource"], kwargs["filename"])
self._import_resource(cloud_storage, resource, obj_id, obj, **kwargs)
@pytest.mark.parametrize(
"obj_id, obj, resource",
@ -284,17 +259,8 @@ class TestImportResource:
task_id = jobs[obj_id]["task_id"]
cloud_storage_id = tasks[task_id]["source_storage"]["cloud_storage_id"]
cloud_storage = cloud_storages[cloud_storage_id]
kwargs = _use_default_settings(obj, resource)
# export current resource to cloud storage
_save_resource_to_cloud_storage(
self._USERNAME, cloud_storage, obj_id, obj, resource, org_id=self._ORG, **kwargs
)
kwargs = _make_default_resource_params(obj, resource)
self._export_resource(cloud_storage, obj_id, obj, resource, **kwargs)
import_resource = {
"annotations": _import_annotations_from_cloud_storage,
"dataset": _import_dataset_from_cloud_storage,
"backup": _import_backup_from_cloud_storage,
}
import_resource[resource](self._USERNAME, obj_id, obj, org_id=self._ORG, **kwargs)
remove_asset(cloud_storage["resource"], kwargs["filename"])
self._import_resource(cloud_storage, resource, obj_id, obj, **kwargs)

@ -0,0 +1,47 @@
# Copyright (C) 2022 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT
import boto3
from botocore.exceptions import ClientError
from shared.utils.config import MINIO_ENDPOINT_URL, MINIO_KEY, MINIO_SECRET_KEY
class S3Client:
def __init__(self, endpoint_url: str, *, access_key: str, secret_key: str) -> None:
self.client = self._make_boto_client(
endpoint_url=endpoint_url, access_key=access_key, secret_key=secret_key
)
@staticmethod
def _make_boto_client(endpoint_url: str, *, access_key: str, secret_key: str):
s3 = boto3.resource(
"s3",
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
endpoint_url=endpoint_url,
)
return s3.meta.client
def create_file(self, bucket: str, filename: str, data: bytes = b""):
self.client.put_object(Body=data, Bucket=bucket, Key=filename)
def remove_file(self, bucket: str, filename: str):
self.client.delete_object(Bucket=bucket, Key=filename)
def file_exists(self, bucket: str, filename: str) -> bool:
try:
self.client.head_object(Bucket=bucket, Key=filename)
return True
except ClientError as e:
if e.response["Error"]["Code"] == "404":
return False
else:
raise
def make_client() -> S3Client:
return S3Client(
endpoint_url=MINIO_ENDPOINT_URL, access_key=MINIO_KEY, secret_key=MINIO_SECRET_KEY
)
Loading…
Cancel
Save