@ -18,6 +18,9 @@ from unittest import mock
import io
import xml . etree . ElementTree as ET
from collections import defaultdict
import zipfile
from pycocotools import coco as coco_loader
import tempfile
def create_db_users ( cls ) :
( group_admin , _ ) = Group . objects . get_or_create ( name = " admin " )
@ -1540,7 +1543,7 @@ class TaskDataAPITestCase(APITestCase):
response = self . _create_task ( None , data )
self . assertEqual ( response . status_code , status . HTTP_401_UNAUTHORIZED )
def compare_objects ( self , obj1 , obj2 , ignore_keys ):
def compare_objects ( self , obj1 , obj2 , ignore_keys , fp_tolerance = .001 ):
if isinstance ( obj1 , dict ) :
self . assertTrue ( isinstance ( obj2 , dict ) , " {} != {} " . format ( obj1 , obj2 ) )
for k in obj1 . keys ( ) :
@ -1553,7 +1556,10 @@ def compare_objects(self, obj1, obj2, ignore_keys):
for v1 , v2 in zip ( obj1 , obj2 ) :
compare_objects ( self , v1 , v2 , ignore_keys )
else :
self . assertEqual ( obj1 , obj2 )
if isinstance ( obj1 , float ) or isinstance ( obj2 , float ) :
self . assertAlmostEqual ( obj1 , obj2 , delta = fp_tolerance )
else :
self . assertEqual ( obj1 , obj2 )
class JobAnnotationAPITestCase ( APITestCase ) :
def setUp ( self ) :
@ -2117,25 +2123,38 @@ class TaskAnnotationAPITestCase(JobAnnotationAPITestCase):
return response
def _upload_api_v1_tasks_id_annotations ( self , pk , user , data , query_params = " " ) :
with ForceLogin ( user , self . client ) :
response = self . client . put (
path = " /api/v1/tasks/ {0} /annotations? {1} " . format ( pk , query_params ) ,
data = data ,
format = " multipart " ,
)
return response
def _get_annotation_formats ( self , user ) :
with ForceLogin ( user , self . client ) :
response = self . client . get (
path = " /api/v1/server/annotation/formats "
)
return response
def _check_response ( self , response , data ) :
if not response . status_code in [
status . HTTP_401_UNAUTHORIZED , status . HTTP_403_FORBIDDEN ] :
compare_objects ( self , data , response . data , ignore_keys = [ " id " ] )
def _run_api_v1_tasks_id_annotations ( self , owner , assignee , annotator ) :
task , jobs = self . _create_task ( owner , assignee )
task , _ = self . _create_task ( owner , assignee )
if annotator :
HTTP_200_OK = status . HTTP_200_OK
HTTP_204_NO_CONTENT = status . HTTP_204_NO_CONTENT
HTTP_400_BAD_REQUEST = status . HTTP_400_BAD_REQUEST
HTTP_202_ACCEPTED = status . HTTP_202_ACCEPTED
HTTP_201_CREATED = status . HTTP_201_CREATED
else :
HTTP_200_OK = status . HTTP_401_UNAUTHORIZED
HTTP_204_NO_CONTENT = status . HTTP_401_UNAUTHORIZED
HTTP_400_BAD_REQUEST = status . HTTP_401_UNAUTHORIZED
HTTP_202_ACCEPTED = status . HTTP_401_UNAUTHORIZED
HTTP_201_CREATED = status . HTTP_401_UNAUTHORIZED
data = {
" version " : 0 ,
@ -2503,51 +2522,262 @@ class TaskAnnotationAPITestCase(JobAnnotationAPITestCase):
" create " , data )
self . assertEqual ( response . status_code , HTTP_400_BAD_REQUEST )
cvat_format = AnnotationFormat . objects . get ( name = " CVAT " )
for annotation_handler in cvat_format . annotationdumper_set . all ( ) :
response = self . _dump_api_v1_tasks_id_annotations ( task [ " id " ] , annotator ,
" format= {} " . format ( annotation_handler . display_name ) )
self . assertEqual ( response . status_code , HTTP_202_ACCEPTED )
response = self . _dump_api_v1_tasks_id_annotations ( task [ " id " ] , annotator ,
" format= {} " . format ( annotation_handler . display_name ) )
self . assertEqual ( response . status_code , HTTP_201_CREATED )
response = self . _dump_api_v1_tasks_id_annotations ( task [ " id " ] , annotator ,
" action=download&format= {} " . format ( annotation_handler . display_name ) )
self . assertEqual ( response . status_code , HTTP_200_OK )
self . _check_dump_response ( response , task , jobs , data )
def _check_dump_response ( self , response , task , jobs , data ) :
if response . status_code == status . HTTP_200_OK :
def etree_to_dict ( t ) :
d = { t . tag : { } if t . attrib else None }
children = list ( t )
if children :
dd = defaultdict ( list )
for dc in map ( etree_to_dict , children ) :
for k , v in dc . items ( ) :
dd [ k ] . append ( v )
d = { t . tag : { k : v [ 0 ] if len ( v ) == 1 else v
for k , v in dd . items ( ) } }
if t . attrib :
d [ t . tag ] . update ( ( ' @ ' + k , v ) for k , v in t . attrib . items ( ) )
if t . text :
text = t . text . strip ( )
if not ( children or t . attrib ) :
d [ t . tag ] = text
return d
self . assertTrue ( response . streaming )
content = io . BytesIO ( b ' ' . join ( response . streaming_content ) )
xmldump = ET . fromstring ( content . read ( ) )
def _run_api_v1_tasks_id_annotations_dump_load ( self , owner , assignee , annotator ) :
if annotator :
HTTP_200_OK = status . HTTP_200_OK
HTTP_204_NO_CONTENT = status . HTTP_204_NO_CONTENT
HTTP_202_ACCEPTED = status . HTTP_202_ACCEPTED
HTTP_201_CREATED = status . HTTP_201_CREATED
else :
HTTP_200_OK = status . HTTP_401_UNAUTHORIZED
HTTP_204_NO_CONTENT = status . HTTP_401_UNAUTHORIZED
HTTP_202_ACCEPTED = status . HTTP_401_UNAUTHORIZED
HTTP_201_CREATED = status . HTTP_401_UNAUTHORIZED
def _get_initial_annotation ( annotation_format ) :
rectangle_tracks_with_attrs = [ {
" frame " : 0 ,
" label_id " : task [ " labels " ] [ 0 ] [ " id " ] ,
" group " : 0 ,
" attributes " : [
{
" spec_id " : task [ " labels " ] [ 0 ] [ " attributes " ] [ 0 ] [ " id " ] ,
" value " : task [ " labels " ] [ 0 ] [ " attributes " ] [ 0 ] [ " values " ] [ 0 ]
} ,
] ,
" shapes " : [
{
" frame " : 0 ,
" points " : [ 1.0 , 2.1 , 50.1 , 30.22 ] ,
" type " : " rectangle " ,
" occluded " : False ,
" outside " : False ,
" attributes " : [
{
" spec_id " : task [ " labels " ] [ 0 ] [ " attributes " ] [ 1 ] [ " id " ] ,
" value " : task [ " labels " ] [ 0 ] [ " attributes " ] [ 1 ] [ " default_value " ]
}
]
} ,
{
" frame " : 1 ,
" points " : [ 2.0 , 2.1 , 77.2 , 36.22 ] ,
" type " : " rectangle " ,
" occluded " : True ,
" outside " : True ,
" attributes " : [
{
" spec_id " : task [ " labels " ] [ 0 ] [ " attributes " ] [ 1 ] [ " id " ] ,
" value " : task [ " labels " ] [ 0 ] [ " attributes " ] [ 1 ] [ " default_value " ]
}
]
} ,
]
} ]
rectangle_tracks_wo_attrs = [ {
" frame " : 1 ,
" label_id " : task [ " labels " ] [ 1 ] [ " id " ] ,
" group " : 0 ,
" attributes " : [ ] ,
" shapes " : [
{
" frame " : 1 ,
" attributes " : [ ] ,
" points " : [ 1.0 , 2.1 , 50.2 , 36.6 ] ,
" type " : " rectangle " ,
" occluded " : False ,
" outside " : False
} ,
{
" frame " : 2 ,
" attributes " : [ ] ,
" points " : [ 1.0 , 2.1 , 51 , 36.6 ] ,
" type " : " rectangle " ,
" occluded " : False ,
" outside " : True
}
]
} ]
rectangle_shapes_with_attrs = [ {
" frame " : 0 ,
" label_id " : task [ " labels " ] [ 0 ] [ " id " ] ,
" group " : 0 ,
" attributes " : [
{
" spec_id " : task [ " labels " ] [ 0 ] [ " attributes " ] [ 0 ] [ " id " ] ,
" value " : task [ " labels " ] [ 0 ] [ " attributes " ] [ 0 ] [ " values " ] [ 0 ]
} ,
{
" spec_id " : task [ " labels " ] [ 0 ] [ " attributes " ] [ 1 ] [ " id " ] ,
" value " : task [ " labels " ] [ 0 ] [ " attributes " ] [ 1 ] [ " default_value " ]
}
] ,
" points " : [ 1.0 , 2.1 , 10.6 , 53.22 ] ,
" type " : " rectangle " ,
" occluded " : False
} ]
rectangle_shapes_wo_attrs = [ {
" frame " : 1 ,
" label_id " : task [ " labels " ] [ 1 ] [ " id " ] ,
" group " : 0 ,
" attributes " : [ ] ,
" points " : [ 2.0 , 2.1 , 40 , 50.7 ] ,
" type " : " rectangle " ,
" occluded " : False
} ]
polygon_shapes_wo_attrs = [ {
" frame " : 1 ,
" label_id " : task [ " labels " ] [ 1 ] [ " id " ] ,
" group " : 0 ,
" attributes " : [ ] ,
" points " : [ 2.0 , 2.1 , 100 , 30.22 , 40 , 77 , 1 , 3 ] ,
" type " : " polygon " ,
" occluded " : False
} ]
annotations = {
" version " : 0 ,
" tags " : [ ] ,
" shapes " : [ ] ,
" tracks " : [ ] ,
}
if annotation_format == " CVAT XML 1.1 for videos " :
annotations [ " tracks " ] = rectangle_tracks_with_attrs + rectangle_tracks_wo_attrs
elif annotation_format == " CVAT XML 1.1 for images " :
annotations [ " shapes " ] = rectangle_shapes_with_attrs + rectangle_shapes_wo_attrs
elif annotation_format == " PASCAL VOC ZIP 1.0 " or \
annotation_format == " YOLO ZIP 1.0 " or \
annotation_format == " TFRecord ZIP 1.0 " :
annotations [ " shapes " ] = rectangle_shapes_wo_attrs
elif annotation_format == " COCO JSON 1.0 " :
annotations [ " shapes " ] = polygon_shapes_wo_attrs
elif annotation_format == " MASK ZIP 1.0 " :
annotations [ " shapes " ] = rectangle_shapes_with_attrs + rectangle_shapes_wo_attrs + polygon_shapes_wo_attrs
annotations [ " tracks " ] = rectangle_tracks_with_attrs + rectangle_tracks_wo_attrs
return annotations
response = self . _get_annotation_formats ( annotator )
self . assertEqual ( response . status_code , HTTP_200_OK )
if annotator is not None :
supported_formats = response . data
else :
supported_formats = [ {
" name " : " CVAT " ,
" dumpers " : [ {
" display_name " : " CVAT XML 1.1 for images "
} ] ,
" loaders " : [ {
" display_name " : " CVAT XML 1.1 "
} ]
} ]
self . assertTrue ( isinstance ( supported_formats , list ) and supported_formats )
for annotation_format in supported_formats :
for dumper in annotation_format [ " dumpers " ] :
# 1. create task
task , jobs = self . _create_task ( owner , assignee )
# 2. add annotation
data = _get_initial_annotation ( dumper [ " display_name " ] )
response = self . _put_api_v1_tasks_id_annotations ( task [ " id " ] , annotator , data )
data [ " version " ] + = 1
self . assertEqual ( response . status_code , HTTP_200_OK )
self . _check_response ( response , data )
# 3. download annotation
response = self . _dump_api_v1_tasks_id_annotations ( task [ " id " ] , annotator ,
" format= {} " . format ( dumper [ " display_name " ] ) )
self . assertEqual ( response . status_code , HTTP_202_ACCEPTED )
response = self . _dump_api_v1_tasks_id_annotations ( task [ " id " ] , annotator ,
" format= {} " . format ( dumper [ " display_name " ] ) )
self . assertEqual ( response . status_code , HTTP_201_CREATED )
response = self . _dump_api_v1_tasks_id_annotations ( task [ " id " ] , annotator ,
" action=download&format= {} " . format ( dumper [ " display_name " ] ) )
self . assertEqual ( response . status_code , HTTP_200_OK )
# 4. check downloaded data
if response . status_code == status . HTTP_200_OK :
self . assertTrue ( response . streaming )
content = io . BytesIO ( b " " . join ( response . streaming_content ) )
self . _check_dump_content ( content , task , jobs , data , annotation_format [ " name " ] )
content . seek ( 0 )
# 5. remove annotation form the task
response = self . _delete_api_v1_tasks_id_annotations ( task [ " id " ] , annotator )
data [ " version " ] + = 1
self . assertEqual ( response . status_code , HTTP_204_NO_CONTENT )
# 6. upload annotation and check annotation
uploaded_data = {
" annotation_file " : content ,
}
for loader in annotation_format [ " loaders " ] :
response = self . _upload_api_v1_tasks_id_annotations ( task [ " id " ] , annotator , uploaded_data , " format= {} " . format ( loader [ " display_name " ] ) )
self . assertEqual ( response . status_code , HTTP_202_ACCEPTED )
response = self . _upload_api_v1_tasks_id_annotations ( task [ " id " ] , annotator , { } , " format= {} " . format ( loader [ " display_name " ] ) )
self . assertEqual ( response . status_code , HTTP_201_CREATED )
response = self . _get_api_v1_tasks_id_annotations ( task [ " id " ] , annotator )
self . assertEqual ( response . status_code , HTTP_200_OK )
data [ " version " ] + = 2 # upload is delete + put
self . _check_response ( response , data )
def _check_dump_content ( self , content , task , jobs , data , annotation_format_name ) :
def etree_to_dict ( t ) :
d = { t . tag : { } if t . attrib else None }
children = list ( t )
if children :
dd = defaultdict ( list )
for dc in map ( etree_to_dict , children ) :
for k , v in dc . items ( ) :
dd [ k ] . append ( v )
d = { t . tag : { k : v [ 0 ] if len ( v ) == 1 else v
for k , v in dd . items ( ) } }
if t . attrib :
d [ t . tag ] . update ( ( ' @ ' + k , v ) for k , v in t . attrib . items ( ) )
if t . text :
text = t . text . strip ( )
if not ( children or t . attrib ) :
d [ t . tag ] = text
return d
if annotation_format_name == " CVAT " :
xmldump = ET . fromstring ( content . read ( ) )
self . assertEqual ( xmldump . tag , " annotations " )
tags = xmldump . findall ( " ./meta " )
self . assertEqual ( len ( tags ) , 1 )
meta = etree_to_dict ( tags [ 0 ] ) [ " meta " ]
self . assertEqual ( meta [ " task " ] [ " name " ] , task [ " name " ] )
elif annotation_format_name == " PASCAL VOC " :
self . assertTrue ( zipfile . is_zipfile ( content ) )
elif annotation_format_name == " YOLO " :
self . assertTrue ( zipfile . is_zipfile ( content ) )
elif annotation_format_name == " COCO " :
with tempfile . NamedTemporaryFile ( ) as tmp_file :
tmp_file . write ( content . read ( ) )
tmp_file . flush ( )
coco = coco_loader . COCO ( tmp_file . name )
self . assertTrue ( coco . getAnnIds ( ) )
elif annotation_format_name == " TFRecord " :
self . assertTrue ( zipfile . is_zipfile ( content ) )
elif annotation_format_name == " MASK " :
self . assertTrue ( zipfile . is_zipfile ( content ) )
def test_api_v1_tasks_id_annotations_admin ( self ) :
self . _run_api_v1_tasks_id_annotations ( self . admin , self . assignee ,
@ -2560,7 +2790,16 @@ class TaskAnnotationAPITestCase(JobAnnotationAPITestCase):
def test_api_v1_tasks_id_annotations_no_auth ( self ) :
self . _run_api_v1_tasks_id_annotations ( self . user , self . assignee , None )
def test_api_v1_tasks_id_annotations_dump_load_admin ( self ) :
self . _run_api_v1_tasks_id_annotations_dump_load ( self . admin , self . assignee ,
self . assignee )
def test_api_v1_tasks_id_annotations_dump_load_user ( self ) :
self . _run_api_v1_tasks_id_annotations_dump_load ( self . user , self . assignee ,
self . assignee )
def test_api_v1_tasks_id_annotations_dump_load_no_auth ( self ) :
self . _run_api_v1_tasks_id_annotations_dump_load ( self . user , self . assignee , None )
class ServerShareAPITestCase ( APITestCase ) :
def setUp ( self ) :