|
|
|
|
@ -1,4 +1,3 @@
|
|
|
|
|
|
|
|
|
|
# Copyright (C) 2021 Intel Corporation
|
|
|
|
|
#
|
|
|
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
|
@ -11,18 +10,23 @@ import xml.etree.ElementTree as ET
|
|
|
|
|
import zipfile
|
|
|
|
|
from io import BytesIO
|
|
|
|
|
|
|
|
|
|
from datumaro.util.test_utils import TestDir
|
|
|
|
|
from datumaro.components.dataset import Dataset
|
|
|
|
|
from datumaro.util.test_utils import TestDir, compare_datasets
|
|
|
|
|
from django.contrib.auth.models import Group, User
|
|
|
|
|
from PIL import Image
|
|
|
|
|
from rest_framework import status
|
|
|
|
|
from rest_framework.test import APIClient, APITestCase
|
|
|
|
|
|
|
|
|
|
path = osp.join(osp.dirname(__file__), 'assets', 'tasks.json')
|
|
|
|
|
with open(path) as f:
|
|
|
|
|
from cvat.apps.dataset_manager.bindings import CvatTaskDataExtractor, TaskData
|
|
|
|
|
from cvat.apps.dataset_manager.task import TaskAnnotation
|
|
|
|
|
from cvat.apps.engine.models import Task
|
|
|
|
|
|
|
|
|
|
tasks_path = osp.join(osp.dirname(__file__), 'assets', 'tasks.json')
|
|
|
|
|
with open(tasks_path) as f:
|
|
|
|
|
tasks = json.load(f)
|
|
|
|
|
|
|
|
|
|
path = osp.join(osp.dirname(__file__), 'assets', 'annotations.json')
|
|
|
|
|
with open(path) as f:
|
|
|
|
|
annotations_path = osp.join(osp.dirname(__file__), 'assets', 'annotations.json')
|
|
|
|
|
with open(annotations_path) as f:
|
|
|
|
|
annotations = json.load(f)
|
|
|
|
|
|
|
|
|
|
def generate_image_file(filename, size=(100, 50)):
|
|
|
|
|
@ -104,6 +108,16 @@ class _DbTestBase(APITestCase):
|
|
|
|
|
response = self.client.get(path, data)
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
def _put_request_with_data(self, path, data, user):
|
|
|
|
|
with ForceLogin(user, self.client):
|
|
|
|
|
response = self.client.put(path, data)
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
def _delete_request(self, path, user):
|
|
|
|
|
with ForceLogin(user, self.client):
|
|
|
|
|
response = self.client.delete(path)
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
def _create_annotations(self, task, name_ann, key_get_values):
|
|
|
|
|
tmp_annotations = copy.deepcopy(annotations[name_ann])
|
|
|
|
|
|
|
|
|
|
@ -152,6 +166,12 @@ class _DbTestBase(APITestCase):
|
|
|
|
|
with open(file_name, "wb") as f:
|
|
|
|
|
f.write(content.getvalue())
|
|
|
|
|
|
|
|
|
|
def _upload_file(self, url, data, user):
|
|
|
|
|
response = self._put_request_with_data(url, {"annotation_file": data}, user)
|
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
|
|
|
|
|
response = self._put_request_with_data(url, {}, user)
|
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
|
|
|
|
|
|
|
|
|
|
def _check_downloaded_file(self, file_name):
|
|
|
|
|
if not osp.exists(file_name):
|
|
|
|
|
raise FileNotFoundError(f"File '{file_name}' was not downloaded")
|
|
|
|
|
@ -159,6 +179,14 @@ class _DbTestBase(APITestCase):
|
|
|
|
|
def _generate_url_dump_tasks_annotations(self, task_id):
|
|
|
|
|
return f"/api/v1/tasks/{task_id}/annotations"
|
|
|
|
|
|
|
|
|
|
def _generate_url_upload_tasks_annotations(self, task_id, upload_format_name):
|
|
|
|
|
return f"/api/v1/tasks/{task_id}/annotations?format={upload_format_name}"
|
|
|
|
|
|
|
|
|
|
def _remove_annotations(self, url, user):
|
|
|
|
|
response = self._delete_request(url, user)
|
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
class TaskDumpUploadTest(_DbTestBase):
|
|
|
|
|
def test_api_v1_check_duplicated_polygon_points(self):
|
|
|
|
|
test_name = self._testMethodName
|
|
|
|
|
@ -189,3 +217,49 @@ class TaskDumpUploadTest(_DbTestBase):
|
|
|
|
|
polygon_points = polygon.attrib["points"].replace(",", ";")
|
|
|
|
|
polygon_points = [float(p) for p in polygon_points.split(";")]
|
|
|
|
|
self.assertEqual(polygon_points, annotation_points)
|
|
|
|
|
|
|
|
|
|
def test_api_v1_check_widerface_with_all_attributes(self):
|
|
|
|
|
test_name = self._testMethodName
|
|
|
|
|
dump_format_name = "WiderFace 1.0"
|
|
|
|
|
upload_format_name = "WiderFace 1.0"
|
|
|
|
|
|
|
|
|
|
for include_images in (False, True):
|
|
|
|
|
with self.subTest():
|
|
|
|
|
# create task with annotations
|
|
|
|
|
images = self._generate_task_images(3)
|
|
|
|
|
task = self._create_task(tasks["widerface with all attributes"], images)
|
|
|
|
|
self._create_annotations(task, f'{dump_format_name}', "random")
|
|
|
|
|
|
|
|
|
|
task_id = task["id"]
|
|
|
|
|
task_ann = TaskAnnotation(task_id)
|
|
|
|
|
task_ann.init_from_db()
|
|
|
|
|
task_data = TaskData(task_ann.ir_data, Task.objects.get(pk=task_id))
|
|
|
|
|
extractor = CvatTaskDataExtractor(task_data, include_images=include_images)
|
|
|
|
|
data_from_task_before_upload = Dataset.from_extractors(extractor)
|
|
|
|
|
|
|
|
|
|
# dump annotations
|
|
|
|
|
url = self._generate_url_dump_tasks_annotations(task_id)
|
|
|
|
|
data = {
|
|
|
|
|
"format": dump_format_name,
|
|
|
|
|
"action": "download",
|
|
|
|
|
}
|
|
|
|
|
with TestDir() as test_dir:
|
|
|
|
|
file_zip_name = osp.join(test_dir, f'{test_name}_{dump_format_name}.zip')
|
|
|
|
|
self._download_file(url, data, self.admin, file_zip_name)
|
|
|
|
|
self._check_downloaded_file(file_zip_name)
|
|
|
|
|
|
|
|
|
|
# remove annotations
|
|
|
|
|
self._remove_annotations(url, self.admin)
|
|
|
|
|
|
|
|
|
|
# upload annotations
|
|
|
|
|
url = self._generate_url_upload_tasks_annotations(task_id, upload_format_name)
|
|
|
|
|
with open(file_zip_name, 'rb') as binary_file:
|
|
|
|
|
self._upload_file(url, binary_file, self.admin)
|
|
|
|
|
|
|
|
|
|
# equals annotations
|
|
|
|
|
task_ann = TaskAnnotation(task_id)
|
|
|
|
|
task_ann.init_from_db()
|
|
|
|
|
task_data = TaskData(task_ann.ir_data, Task.objects.get(pk=task_id))
|
|
|
|
|
extractor = CvatTaskDataExtractor(task_data, include_images=include_images)
|
|
|
|
|
data_from_task_after_upload = Dataset.from_extractors(extractor)
|
|
|
|
|
compare_datasets(self, data_from_task_before_upload, data_from_task_after_upload)
|
|
|
|
|
|