add test for bug with duplicate points in polygon (#3278)
parent
140489dccc
commit
4584b2384b
@ -0,0 +1,46 @@
|
||||
{
|
||||
"CVAT for video 1.1 polygon": {
|
||||
"version": 0,
|
||||
"tags": [],
|
||||
"shapes": [],
|
||||
"tracks": [
|
||||
{
|
||||
"frame": 0,
|
||||
"label_id": null,
|
||||
"group": 1,
|
||||
"source": "manual",
|
||||
"shapes": [
|
||||
{
|
||||
"type": "polygon",
|
||||
"occluded": false,
|
||||
"z_order": 0,
|
||||
"points": [24.62, 13.01, 34.88, 20.03, 18.14, 18.08],
|
||||
"frame": 0,
|
||||
"outside": false,
|
||||
"attributes": []
|
||||
},
|
||||
{
|
||||
"type": "polygon",
|
||||
"occluded": false,
|
||||
"z_order": 0,
|
||||
"points": [24.62, 13.01, 34.88, 20.03, 18.14, 18.08],
|
||||
"frame": 1,
|
||||
"outside": true,
|
||||
"attributes": []
|
||||
},
|
||||
{
|
||||
"type": "polygon",
|
||||
"occluded": false,
|
||||
"z_order": 0,
|
||||
"points": [24.62, 13.01, 34.88, 20.03, 18.14, 18.08],
|
||||
"frame": 2,
|
||||
"outside": false,
|
||||
"keyframe": true,
|
||||
"attributes": []
|
||||
}
|
||||
],
|
||||
"attributes": []
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,57 @@
|
||||
{
|
||||
"main": {
|
||||
"name": "main task",
|
||||
"overlap": 0,
|
||||
"segment_size": 100,
|
||||
"owner_id": 1,
|
||||
"assignee_id": 2,
|
||||
"labels": [
|
||||
{
|
||||
"name": "car",
|
||||
"color": "#2080c0",
|
||||
"attributes": [
|
||||
{
|
||||
"name": "select_name",
|
||||
"mutable": false,
|
||||
"input_type": "select",
|
||||
"default_value": "bmw",
|
||||
"values": ["bmw", "mazda", "renault"]
|
||||
},
|
||||
{
|
||||
"name": "radio_name",
|
||||
"mutable": false,
|
||||
"input_type": "radio",
|
||||
"default_value": "x1",
|
||||
"values": ["x1", "x2", "x3"]
|
||||
},
|
||||
{
|
||||
"name": "check_name",
|
||||
"mutable": true,
|
||||
"input_type": "checkbox",
|
||||
"default_value": "false",
|
||||
"values": ["false"]
|
||||
},
|
||||
{
|
||||
"name": "text_name",
|
||||
"mutable": false,
|
||||
"input_type": "text",
|
||||
"default_value": "qwerty",
|
||||
"values": ["qwerty"]
|
||||
},
|
||||
{
|
||||
"name": "number_name",
|
||||
"mutable": false,
|
||||
"input_type": "number",
|
||||
"default_value": "-4",
|
||||
"values": ["-4", "4", "1"]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "person",
|
||||
"color": "#c06060",
|
||||
"attributes": []
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,191 @@
|
||||
|
||||
# Copyright (C) 2021 Intel Corporation
|
||||
#
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
import copy
|
||||
import json
|
||||
import os.path as osp
|
||||
import random
|
||||
import xml.etree.ElementTree as ET
|
||||
import zipfile
|
||||
from io import BytesIO
|
||||
|
||||
from datumaro.util.test_utils import TestDir
|
||||
from django.contrib.auth.models import Group, User
|
||||
from PIL import Image
|
||||
from rest_framework import status
|
||||
from rest_framework.test import APIClient, APITestCase
|
||||
|
||||
path = osp.join(osp.dirname(__file__), 'assets', 'tasks.json')
|
||||
with open(path) as f:
|
||||
tasks = json.load(f)
|
||||
|
||||
path = osp.join(osp.dirname(__file__), 'assets', 'annotations.json')
|
||||
with open(path) as f:
|
||||
annotations = json.load(f)
|
||||
|
||||
def generate_image_file(filename, size=(100, 50)):
|
||||
f = BytesIO()
|
||||
image = Image.new('RGB', size=size)
|
||||
image.save(f, 'jpeg')
|
||||
f.name = filename
|
||||
f.seek(0)
|
||||
return f
|
||||
|
||||
class ForceLogin:
|
||||
def __init__(self, user, client):
|
||||
self.user = user
|
||||
self.client = client
|
||||
|
||||
def __enter__(self):
|
||||
if self.user:
|
||||
self.client.force_login(self.user,
|
||||
backend='django.contrib.auth.backends.ModelBackend')
|
||||
|
||||
return self
|
||||
|
||||
def __exit__(self, exception_type, exception_value, traceback):
|
||||
if self.user:
|
||||
self.client.logout()
|
||||
|
||||
class _DbTestBase(APITestCase):
|
||||
def setUp(self):
|
||||
self.client = APIClient()
|
||||
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
cls.create_db_users()
|
||||
|
||||
@classmethod
|
||||
def create_db_users(cls):
|
||||
(group_admin, _) = Group.objects.get_or_create(name="admin")
|
||||
(group_user, _) = Group.objects.get_or_create(name="user")
|
||||
|
||||
user_admin = User.objects.create_superuser(username="admin", email="",
|
||||
password="admin")
|
||||
user_admin.groups.add(group_admin)
|
||||
user_dummy = User.objects.create_user(username="user", password="user")
|
||||
user_dummy.groups.add(group_user)
|
||||
|
||||
cls.admin = user_admin
|
||||
cls.user = user_dummy
|
||||
|
||||
def _put_api_v1_task_id_annotations(self, tid, data):
|
||||
with ForceLogin(self.admin, self.client):
|
||||
response = self.client.put("/api/v1/tasks/%s/annotations" % tid,
|
||||
data=data, format="json")
|
||||
|
||||
return response
|
||||
|
||||
@staticmethod
|
||||
def _generate_task_images(count): # pylint: disable=no-self-use
|
||||
images = {"client_files[%d]" % i: generate_image_file("image_%d.jpg" % i) for i in range(count)}
|
||||
images["image_quality"] = 75
|
||||
return images
|
||||
|
||||
def _create_task(self, data, image_data):
|
||||
with ForceLogin(self.user, self.client):
|
||||
response = self.client.post('/api/v1/tasks', data=data, format="json")
|
||||
assert response.status_code == status.HTTP_201_CREATED, response.status_code
|
||||
tid = response.data["id"]
|
||||
|
||||
response = self.client.post("/api/v1/tasks/%s/data" % tid,
|
||||
data=image_data)
|
||||
assert response.status_code == status.HTTP_202_ACCEPTED, response.status_code
|
||||
|
||||
response = self.client.get("/api/v1/tasks/%s" % tid)
|
||||
task = response.data
|
||||
|
||||
return task
|
||||
|
||||
def _get_request_with_data(self, path, data, user):
|
||||
with ForceLogin(user, self.client):
|
||||
response = self.client.get(path, data)
|
||||
return response
|
||||
|
||||
def _create_annotations(self, task, name_ann, key_get_values):
|
||||
tmp_annotations = copy.deepcopy(annotations[name_ann])
|
||||
|
||||
# change attributes in all annotations
|
||||
for item in tmp_annotations:
|
||||
if item in ["tags", "shapes", "tracks"]:
|
||||
for index_elem, _ in enumerate(tmp_annotations[item]):
|
||||
tmp_annotations[item][index_elem]["label_id"] = task["labels"][0]["id"]
|
||||
|
||||
for index_attribute, attribute in enumerate(task["labels"][0]["attributes"]):
|
||||
spec_id = task["labels"][0]["attributes"][index_attribute]["id"]
|
||||
|
||||
if key_get_values == "random":
|
||||
if attribute["input_type"] == "number":
|
||||
start = int(attribute["values"][0])
|
||||
stop = int(attribute["values"][1]) + 1
|
||||
step = int(attribute["values"][2])
|
||||
value = str(random.randrange(start, stop, step))
|
||||
else:
|
||||
value = random.choice(task["labels"][0]["attributes"][index_attribute]["values"])
|
||||
elif key_get_values == "default":
|
||||
value = attribute["default_value"]
|
||||
|
||||
if item == "tracks" and attribute["mutable"]:
|
||||
for index_shape, _ in enumerate(tmp_annotations[item][index_elem]["shapes"]):
|
||||
tmp_annotations[item][index_elem]["shapes"][index_shape]["attributes"].append({
|
||||
"spec_id": spec_id,
|
||||
"value": value,
|
||||
})
|
||||
else:
|
||||
tmp_annotations[item][index_elem]["attributes"].append({
|
||||
"spec_id": spec_id,
|
||||
"value": value,
|
||||
})
|
||||
|
||||
response = self._put_api_v1_task_id_annotations(task["id"], tmp_annotations)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
def _download_file(self, url, data, user, file_name):
|
||||
response = self._get_request_with_data(url, data, user)
|
||||
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
|
||||
response = self._get_request_with_data(url, data, user)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
content = BytesIO(b"".join(response.streaming_content))
|
||||
with open(file_name, "wb") as f:
|
||||
f.write(content.getvalue())
|
||||
|
||||
def _check_downloaded_file(self, file_name):
|
||||
if not osp.exists(file_name):
|
||||
raise FileNotFoundError(f"File '{file_name}' was not downloaded")
|
||||
|
||||
def _generate_url_dump_tasks_annotations(self, task_id):
|
||||
return f"/api/v1/tasks/{task_id}/annotations"
|
||||
|
||||
class TaskDumpUploadTest(_DbTestBase):
|
||||
def test_api_v1_check_duplicated_polygon_points(self):
|
||||
test_name = self._testMethodName
|
||||
images = self._generate_task_images(10)
|
||||
task = self._create_task(tasks["main"], images)
|
||||
task_id = task["id"]
|
||||
data = {
|
||||
"format": "CVAT for video 1.1",
|
||||
"action": "download",
|
||||
}
|
||||
annotation_name = "CVAT for video 1.1 polygon"
|
||||
self._create_annotations(task, annotation_name, "default")
|
||||
annotation_points = annotations[annotation_name]["tracks"][0]["shapes"][0]['points']
|
||||
|
||||
with TestDir() as test_dir:
|
||||
url = self._generate_url_dump_tasks_annotations(task_id)
|
||||
file_zip_name = osp.join(test_dir, f'{test_name}.zip')
|
||||
self._download_file(url, data, self.admin, file_zip_name)
|
||||
self._check_downloaded_file(file_zip_name)
|
||||
|
||||
folder_name = osp.join(test_dir, f'{test_name}')
|
||||
with zipfile.ZipFile(file_zip_name, 'r') as zip_ref:
|
||||
zip_ref.extractall(folder_name)
|
||||
|
||||
tree = ET.parse(osp.join(folder_name, 'annotations.xml'))
|
||||
root = tree.getroot()
|
||||
for polygon in root.findall("./track[@id='0']/polygon"):
|
||||
polygon_points = polygon.attrib["points"].replace(",", ";")
|
||||
polygon_points = [float(p) for p in polygon_points.split(";")]
|
||||
self.assertEqual(polygon_points, annotation_points)
|
||||
Loading…
Reference in New Issue