@ -3,52 +3,34 @@
#
# SPDX-License-Identifier: MIT
import sys
import os . path as osp
from collections import OrderedDict , namedtuple
from collections import namedtuple
from typing import Any , Callable , DefaultDict , Dict , List , Literal , Mapping , NamedTuple , OrderedDict , Tuple , Union
from pathlib import Path
from django . utils import timezone
import datumaro . components . extractor as datumaro
from cvat . apps . engine . frame_provider import FrameProvider
from cvat . apps . engine . models import AttributeType , ShapeType , DimensionType, Image as Img
from cvat . apps . engine . models import AttributeType , ShapeType , Project, Task , Label , DimensionType, Image as Img
from datumaro . util import cast
from datumaro . util . image import ByteImage , Image
from . annotation import AnnotationManager , TrackManager
from . annotation import AnnotationManager , TrackManager , AnnotationIR
class TaskData :
Attribute = namedtuple ( ' Attribute ' , ' name, value ' )
Shape = namedtuple ( " Shape " , ' id, label_id ' ) # 3d
LabeledShape = namedtuple (
' LabeledShape ' , ' type, frame, label, points, occluded, attributes, source, group, z_order ' )
LabeledShape . __new__ . __defaults__ = ( 0 , 0 )
TrackedShape = namedtuple (
' TrackedShape ' , ' type, frame, points, occluded, outside, keyframe, attributes, source, group, z_order, label, track_id ' )
TrackedShape . __new__ . __defaults__ = ( ' manual ' , 0 , 0 , None , 0 )
Track = namedtuple ( ' Track ' , ' label, group, source, shapes ' )
Tag = namedtuple ( ' Tag ' , ' frame, label, attributes, source, group ' )
Tag . __new__ . __defaults__ = ( 0 , )
Frame = namedtuple (
' Frame ' , ' idx, id, frame, name, width, height, labeled_shapes, tags, shapes, labels ' )
Labels = namedtuple ( ' Label ' , ' id, name, color ' )
class InstanceLabelData :
Attribute = NamedTuple ( ' Attribute ' , [ ( ' name ' , str ) , ( ' value ' , Any ) ] )
def __init__ ( self , annotation_ir , db_task , host = ' ' , create_callback = None ) :
self . _annotation_ir = annotation_ir
self . _db_task = db_task
self . _host = host
self . _create_callback = create_callback
self . _MAX_ANNO_SIZE = 30000
self . _frame_info = { }
self . _frame_mapping = { }
self . _frame_step = db_task . data . get_frame_step ( )
def __init__ ( self , instance : Union [ Task , Project ] ) - > None :
instance = instance . project if isinstance ( instance , Task ) and instance . project_id is not None else instance
db_labels = ( self . _db_task . project if self . _db_task . project_id else self . _db_task ) . label_set . all ( ) . prefetch_related (
' attributespec_set ' ) . order_by ( ' pk ' )
db_labels = instance . label_set . all ( ) . prefetch_related ( ' attributespec_set ' ) . order_by ( ' pk ' )
self . _label_mapping = OrderedDict (
( db_label . id , db_label ) for db_label in db_labels )
self . _label_mapping = OrderedDict [ int , Label ] (
( ( db_label . id , db_label ) for db_label in db_labels ) ,
)
self . _attribute_mapping = { db_label . id : {
' mutable ' : { } , ' immutable ' : { } , ' spec ' : { } }
@ -69,9 +51,6 @@ class TaskData:
* * attr_mapping [ ' immutable ' ] ,
}
self . _init_frame_info ( )
self . _init_meta ( )
def _get_label_id ( self , label_name ) :
for db_label in self . _label_mapping . values ( ) :
if label_name == db_label . name :
@ -103,6 +82,71 @@ class TaskData:
def _get_immutable_attribute_id ( self , label_id , attribute_name ) :
return self . _get_attribute_id ( label_id , attribute_name , ' immutable ' )
def _import_attribute ( self , label_id , attribute ) :
spec_id = self . _get_attribute_id ( label_id , attribute . name )
value = attribute . value
if spec_id :
spec = self . _attribute_mapping [ label_id ] [ ' spec ' ] [ spec_id ]
try :
if spec . input_type == AttributeType . NUMBER :
pass # no extra processing required
elif spec . input_type == AttributeType . CHECKBOX :
if isinstance ( value , str ) :
value = value . lower ( )
assert value in { ' true ' , ' false ' }
elif isinstance ( value , ( bool , int , float ) ) :
value = ' true ' if value else ' false '
else :
raise ValueError ( " Unexpected attribute value " )
except Exception as e :
raise Exception ( " Failed to convert attribute ' %s ' = ' %s ' : %s " %
( self . _get_label_name ( label_id ) , value , e ) )
return { ' spec_id ' : spec_id , ' value ' : value }
def _export_attributes ( self , attributes ) :
exported_attributes = [ ]
for attr in attributes :
attribute_name = self . _get_attribute_name ( attr [ " spec_id " ] )
exported_attributes . append ( InstanceLabelData . Attribute (
name = attribute_name ,
value = attr [ " value " ] ,
) )
return exported_attributes
class TaskData ( InstanceLabelData ) :
Shape = namedtuple ( " Shape " , ' id, label_id ' ) # 3d
LabeledShape = namedtuple (
' LabeledShape ' , ' type, frame, label, points, occluded, attributes, source, group, z_order ' )
LabeledShape . __new__ . __defaults__ = ( 0 , 0 )
TrackedShape = namedtuple (
' TrackedShape ' , ' type, frame, points, occluded, outside, keyframe, attributes, source, group, z_order, label, track_id ' )
TrackedShape . __new__ . __defaults__ = ( ' manual ' , 0 , 0 , None , 0 )
Track = namedtuple ( ' Track ' , ' label, group, source, shapes ' )
Tag = namedtuple ( ' Tag ' , ' frame, label, attributes, source, group ' )
Tag . __new__ . __defaults__ = ( 0 , )
Frame = namedtuple (
' Frame ' , ' idx, id, frame, name, width, height, labeled_shapes, tags, shapes, labels ' )
Labels = namedtuple ( ' Label ' , ' id, name, color ' )
def __init__ ( self , annotation_ir , db_task , host = ' ' , create_callback = None ) :
self . _annotation_ir = annotation_ir
self . _db_task = db_task
self . _host = host
self . _create_callback = create_callback
self . _MAX_ANNO_SIZE = 30000
self . _frame_info = { }
self . _frame_mapping = { }
self . _frame_step = db_task . data . get_frame_step ( )
InstanceLabelData . __init__ ( self , db_task )
self . _init_frame_info ( )
self . _init_meta ( )
def abs_frame_id ( self , relative_id ) :
if relative_id not in range ( 0 , self . _db_task . data . size ) :
raise ValueError ( " Unknown internal frame id %s " % relative_id )
@ -135,79 +179,80 @@ class TaskData:
for frame_number , info in self . _frame_info . items ( )
}
def _init_meta ( self ) :
db_segments = self . _db_task . segment_set . all ( ) . prefetch_related ( ' job_set ' )
self . _meta = OrderedDict ( [
( " task " , OrderedDict ( [
( " id " , str ( self . _db_task . id ) ) ,
( " name " , self . _db_task . name ) ,
( " size " , str ( self . _db_task . data . size ) ) ,
( " mode " , self . _db_task . mode ) ,
( " overlap " , str ( self . _db_task . overlap ) ) ,
( " bugtracker " , self . _db_task . bug_tracker ) ,
( " created " , str ( timezone . localtime ( self . _db_task . created_date ) ) ) ,
( " updated " , str ( timezone . localtime ( self . _db_task . updated_date ) ) ) ,
( " start_frame " , str ( self . _db_task . data . start_frame ) ) ,
( " stop_frame " , str ( self . _db_task . data . stop_frame ) ) ,
( " frame_filter " , self . _db_task . data . frame_filter ) ,
( " labels " , [
( " label " , OrderedDict ( [
( " name " , db_label . name ) ,
( " color " , db_label . color ) ,
( " attributes " , [
( " attribute " , OrderedDict ( [
( " name " , db_attr . name ) ,
( " mutable " , str ( db_attr . mutable ) ) ,
( " input_type " , db_attr . input_type ) ,
( " default_value " , db_attr . default_value ) ,
( " values " , db_attr . values ) ] ) )
for db_attr in db_label . attributespec_set . all ( ) ] )
] ) ) for db_label in self . _label_mapping . values ( )
] ) ,
@staticmethod
def meta_for_task ( db_task , host , label_mapping = None ) :
db_segments = db_task . segment_set . all ( ) . prefetch_related ( ' job_set ' )
meta = OrderedDict ( [
( " id " , str ( db_task . id ) ) ,
( " name " , db_task . name ) ,
( " size " , str ( db_task . data . size ) ) ,
( " mode " , db_task . mode ) ,
( " overlap " , str ( db_task . overlap ) ) ,
( " bugtracker " , db_task . bug_tracker ) ,
( " created " , str ( timezone . localtime ( db_task . created_date ) ) ) ,
( " updated " , str ( timezone . localtime ( db_task . updated_date ) ) ) ,
( " subset " , db_task . subset or datumaro . DEFAULT_SUBSET_NAME ) ,
( " start_frame " , str ( db_task . data . start_frame ) ) ,
( " stop_frame " , str ( db_task . data . stop_frame ) ) ,
( " frame_filter " , db_task . data . frame_filter ) ,
( " segments " , [
( " segment " , OrderedDict ( [
( " id " , str ( db_segment . id ) ) ,
( " start " , str ( db_segment . start_frame ) ) ,
( " stop " , str ( db_segment . stop_frame ) ) ,
( " url " , " {} /?id= {} " . format (
host , db_segment . job_set . all ( ) [ 0 ] . id ) ) ]
) ) for db_segment in db_segments
] ) ,
( " owner " , OrderedDict ( [
( " username " , db_task . owner . username ) ,
( " email " , db_task . owner . email )
] ) if db_task . owner else " " ) ,
( " assignee " , OrderedDict ( [
( " username " , db_task . assignee . username ) ,
( " email " , db_task . assignee . email )
] ) if db_task . assignee else " " ) ,
] )
( " segments " , [
( " segment " , OrderedDict ( [
( " id " , str ( db_segment . id ) ) ,
( " start " , str ( db_segment . start_frame ) ) ,
( " stop " , str ( db_segment . stop_frame ) ) ,
( " url " , " {} /?id= {} " . format (
self . _host , db_segment . job_set . all ( ) [ 0 ] . id ) ) ]
) ) for db_segment in db_segments
] ) ,
if label_mapping is not None :
meta [ ' labels ' ] = [
( " label " , OrderedDict ( [
( " name " , db_label . name ) ,
( " color " , db_label . color ) ,
( " attributes " , [
( " attribute " , OrderedDict ( [
( " name " , db_attr . name ) ,
( " mutable " , str ( db_attr . mutable ) ) ,
( " input_type " , db_attr . input_type ) ,
( " default_value " , db_attr . default_value ) ,
( " values " , db_attr . values ) ] ) )
for db_attr in db_label . attributespec_set . all ( ) ] )
] ) ) for db_label in label_mapping . values ( )
]
if hasattr ( db_task . data , " video " ) :
meta [ " original_size " ] = OrderedDict ( [
( " width " , str ( db_task . data . video . width ) ) ,
( " height " , str ( db_task . data . video . height ) )
] )
( " owner " , OrderedDict ( [
( " username " , self . _db_task . owner . username ) ,
( " email " , self . _db_task . owner . email )
] ) if self . _db_task . owner else " " ) ,
return meta
( " assignee " , OrderedDict ( [
( " username " , self . _db_task . assignee . username ) ,
( " email " , self . _db_task . assignee . email )
] ) if self . _db_task . assignee else " " ) ,
] ) ) ,
def _init_meta ( self ) :
self . _meta = OrderedDict ( [
( " task " , self . meta_for_task ( self . _db_task , self . _host , self . _label_mapping ) ) ,
( " dumped " , str ( timezone . localtime ( timezone . now ( ) ) ) )
] )
if hasattr ( self . _db_task . data , " video " ) :
self . _meta [ " task " ] [ " original_size " ] = OrderedDict ( [
( " width " , str ( self . _db_task . data . video . width ) ) ,
( " height " , str ( self . _db_task . data . video . height ) )
] )
# Add source to dumped file
self . _meta [ " source " ] = str (
osp . basename ( self . _db_task . data . video . path ) )
def _export_attributes ( self , attributes ) :
exported_attributes = [ ]
for attr in attributes :
attribute_name = self . _get_attribute_name ( attr [ " spec_id " ] )
exported_attributes . append ( TaskData . Attribute (
name = attribute_name ,
value = attr [ " value " ] ,
) )
return exported_attributes
def _export_tracked_shape ( self , shape ) :
return TaskData . TrackedShape (
type = shape [ " type " ] ,
@ -356,30 +401,6 @@ class TaskData:
if self . _get_attribute_id ( label_id , attrib . name ) ]
return _tag
def _import_attribute ( self , label_id , attribute ) :
spec_id = self . _get_attribute_id ( label_id , attribute . name )
value = attribute . value
if spec_id :
spec = self . _attribute_mapping [ label_id ] [ ' spec ' ] [ spec_id ]
try :
if spec . input_type == AttributeType . NUMBER :
pass # no extra processing required
elif spec . input_type == AttributeType . CHECKBOX :
if isinstance ( value , str ) :
value = value . lower ( )
assert value in { ' true ' , ' false ' }
elif isinstance ( value , ( bool , int , float ) ) :
value = ' true ' if value else ' false '
else :
raise ValueError ( " Unexpected attribute value " )
except Exception as e :
raise Exception ( " Failed to convert attribute ' %s ' = ' %s ' : %s " %
( self . _get_label_name ( label_id ) , value , e ) )
return { ' spec_id ' : spec_id , ' value ' : value }
def _import_shape ( self , shape ) :
_shape = shape . _asdict ( )
label_id = self . _get_label_id ( _shape . pop ( ' label ' ) )
@ -482,7 +503,328 @@ class TaskData:
return v
return None
class CvatTaskDataExtractor ( datumaro . SourceExtractor ) :
class ProjectData ( InstanceLabelData ) :
LabeledShape = NamedTuple ( ' LabledShape ' , [ ( ' type ' , str ) , ( ' frame ' , int ) , ( ' label ' , str ) , ( ' points ' , List [ float ] ) , ( ' occluded ' , bool ) , ( ' attributes ' , List [ InstanceLabelData . Attribute ] ) , ( ' source ' , str ) , ( ' group ' , int ) , ( ' z_order ' , int ) , ( ' task_id ' , int ) ] )
LabeledShape . __new__ . __defaults__ = ( 0 , 0 )
TrackedShape = NamedTuple ( ' TrackedShape ' ,
[ ( ' type ' , str ) , ( ' frame ' , int ) , ( ' points ' , List [ float ] ) , ( ' occluded ' , bool ) , ( ' outside ' , bool ) , ( ' keyframe ' , bool ) , ( ' attributes ' , List [ InstanceLabelData . Attribute ] ) , ( ' source ' , str ) , ( ' group ' , int ) , ( ' z_order ' , int ) , ( ' label ' , str ) , ( ' track_id ' , int ) ] ,
)
TrackedShape . __new__ . __defaults__ = ( ' manual ' , 0 , 0 , None , 0 )
Track = NamedTuple ( ' Track ' , [ ( ' label ' , str ) , ( ' group ' , int ) , ( ' source ' , str ) , ( ' shapes ' , List [ TrackedShape ] ) , ( ' task_id ' , int ) ] )
Tag = NamedTuple ( ' Tag ' , [ ( ' frame ' , int ) , ( ' label ' , str ) , ( ' attributes ' , List [ InstanceLabelData . Attribute ] ) , ( ' source ' , str ) , ( ' group ' , int ) , ( ' task_id ' , int ) ] )
Tag . __new__ . __defaults__ = ( 0 , )
Frame = NamedTuple ( ' Frame ' , [ ( ' task_id ' , int ) , ( ' subset ' , str ) , ( ' idx ' , int ) , ( ' frame ' , int ) , ( ' name ' , str ) , ( ' width ' , int ) , ( ' height ' , int ) , ( ' labeled_shapes ' , List [ Union [ LabeledShape , TrackedShape ] ] ) , ( ' tags ' , List [ Tag ] ) ] )
def __init__ ( self , annotation_irs : Mapping [ str , AnnotationIR ] , db_project : Project , host : str , create_callback : Callable = None ) :
self . _annotation_irs = annotation_irs
self . _db_project = db_project
self . _db_tasks : OrderedDict [ int , Task ] = OrderedDict (
( ( db_task . id , db_task ) for db_task in db_project . tasks . order_by ( " subset " , " id " ) . all ( ) )
)
self . _subsets = set ( )
self . _host = host
self . _create_callback = create_callback
self . _MAX_ANNO_SIZE = 30000
self . _frame_info : Dict [ Tuple [ int , int ] , Literal [ " path " , " width " , " height " , " subset " ] ] = dict ( )
self . _frame_mapping : Dict [ Tuple [ str , str ] , Tuple [ str , str ] ] = dict ( )
self . _frame_steps : Dict [ int , int ] = { task . id : task . data . get_frame_step ( ) for task in self . _db_tasks . values ( ) }
for task in self . _db_tasks . values ( ) :
self . _subsets . add ( task . subset )
self . _subsets : List [ str ] = list ( self . _subsets )
InstanceLabelData . __init__ ( self , db_project )
self . _init_task_frame_offsets ( )
self . _init_frame_info ( )
self . _init_meta ( )
def abs_frame_id ( self , task_id : int , relative_id : int ) - > int :
task = self . _db_tasks [ task_id ]
if relative_id not in range ( 0 , task . data . size ) :
raise ValueError ( f " Unknown internal frame id { relative_id } " )
return relative_id * task . data . get_frame_step ( ) + task . data . start_frame + self . _task_frame_offsets [ task_id ]
def rel_frame_id ( self , task_id : int , absolute_id : int ) - > int :
task = self . _db_tasks [ task_id ]
d , m = divmod (
absolute_id - task . data . start_frame , task . data . get_frame_step ( ) )
if m or d not in range ( 0 , task . data . size ) :
raise ValueError ( f " Unknown frame { absolute_id } " )
return d
def _init_task_frame_offsets ( self ) :
self . _task_frame_offsets : Dict [ int , int ] = dict ( )
s = 0
subset = None
for task in self . _db_tasks . values ( ) :
if subset != task . subset :
s = 0
subset = task . subset
self . _task_frame_offsets [ task . id ] = s
s + = task . data . start_frame + task . data . get_frame_step ( ) * task . data . size
def _init_frame_info ( self ) :
self . _frame_info = dict ( )
original_names = DefaultDict [ Tuple [ str , str ] , int ] ( int )
for task in self . _db_tasks . values ( ) :
defaulted_subset = get_defaulted_subset ( task . subset , self . _subsets )
if hasattr ( task . data , ' video ' ) :
self . _frame_info . update ( { ( task . id , frame ) : {
" path " : " frame_ {:06d} " . format ( self . abs_frame_id ( task . id , frame ) ) ,
" width " : task . data . video . width ,
" height " : task . data . video . height ,
" subset " : defaulted_subset ,
} for frame in range ( task . data . size ) } )
else :
self . _frame_info . update ( { ( task . id , self . rel_frame_id ( task . id , db_image . frame ) ) : {
" path " : mangle_image_name ( db_image . path , defaulted_subset , original_names ) ,
" width " : db_image . width ,
" height " : db_image . height ,
" subset " : defaulted_subset
} for db_image in task . data . images . all ( ) } )
self . _frame_mapping = {
( self . _db_tasks [ frame_ident [ 0 ] ] . subset , self . _get_filename ( info [ " path " ] ) ) : frame_ident
for frame_ident , info in self . _frame_info . items ( )
}
def _init_meta ( self ) :
self . _meta = OrderedDict ( [
( ' project ' , OrderedDict ( [
( ' id ' , str ( self . _db_project . id ) ) ,
( ' name ' , self . _db_project . name ) ,
( " bugtracker " , self . _db_project . bug_tracker ) ,
( " created " , str ( timezone . localtime ( self . _db_project . created_date ) ) ) ,
( " updated " , str ( timezone . localtime ( self . _db_project . updated_date ) ) ) ,
( " tasks " , [
( ' task ' ,
TaskData . meta_for_task ( db_task , self . _host )
) for db_task in self . _db_tasks . values ( )
] ) ,
( " labels " , [
( " label " , OrderedDict ( [
( " name " , db_label . name ) ,
( " color " , db_label . color ) ,
( " attributes " , [
( " attribute " , OrderedDict ( [
( " name " , db_attr . name ) ,
( " mutable " , str ( db_attr . mutable ) ) ,
( " input_type " , db_attr . input_type ) ,
( " default_value " , db_attr . default_value ) ,
( " values " , db_attr . values ) ] ) )
for db_attr in db_label . attributespec_set . all ( ) ] )
] ) ) for db_label in self . _label_mapping . values ( )
] ) ,
( " owner " , OrderedDict ( [
( " username " , self . _db_project . owner . username ) ,
( " email " , self . _db_project . owner . email ) ,
] ) if self . _db_project . owner else " " ) ,
( " assignee " , OrderedDict ( [
( " username " , self . _db_project . assignee . username ) ,
( " email " , self . _db_project . assignee . email ) ,
] ) if self . _db_project . assignee else " " ) ,
] ) ) ,
( " dumped " , str ( timezone . localtime ( timezone . now ( ) ) ) )
] )
def _export_tracked_shape ( self , shape : dict , task_id : int ) :
return ProjectData . TrackedShape (
type = shape [ " type " ] ,
frame = self . abs_frame_id ( task_id , shape [ " frame " ] ) ,
label = self . _get_label_name ( shape [ " label_id " ] ) ,
points = shape [ " points " ] ,
occluded = shape [ " occluded " ] ,
z_order = shape . get ( " z_order " , 0 ) ,
group = shape . get ( " group " , 0 ) ,
outside = shape . get ( " outside " , False ) ,
keyframe = shape . get ( " keyframe " , True ) ,
track_id = shape [ " track_id " ] ,
source = shape . get ( " source " , " manual " ) ,
attributes = self . _export_attributes ( shape [ " attributes " ] ) ,
)
def _export_labeled_shape ( self , shape : dict , task_id : int ) :
return ProjectData . LabeledShape (
type = shape [ " type " ] ,
label = self . _get_label_name ( shape [ " label_id " ] ) ,
frame = self . abs_frame_id ( task_id , shape [ " frame " ] ) ,
points = shape [ " points " ] ,
occluded = shape [ " occluded " ] ,
z_order = shape . get ( " z_order " , 0 ) ,
group = shape . get ( " group " , 0 ) ,
source = shape [ " source " ] ,
attributes = self . _export_attributes ( shape [ " attributes " ] ) ,
task_id = task_id ,
)
def _export_tag ( self , tag : dict , task_id : int ) :
return ProjectData . Tag (
frame = self . abs_frame_id ( task_id , tag [ " frame " ] ) ,
label = self . _get_label_name ( tag [ " label_id " ] ) ,
group = tag . get ( " group " , 0 ) ,
source = tag [ " source " ] ,
attributes = self . _export_attributes ( tag [ " attributes " ] ) ,
task_id = task_id
)
def group_by_frame ( self , include_empty = False ) :
frames : Dict [ Tuple [ str , int ] , ProjectData . Frame ] = { }
def get_frame ( task_id : int , idx : int ) - > ProjectData . Frame :
frame_info = self . _frame_info [ ( task_id , idx ) ]
abs_frame = self . abs_frame_id ( task_id , idx )
if ( frame_info [ " subset " ] , abs_frame ) not in frames :
frames [ ( frame_info [ " subset " ] , abs_frame ) ] = ProjectData . Frame (
task_id = task_id ,
subset = frame_info [ " subset " ] ,
idx = idx ,
frame = abs_frame ,
name = frame_info [ " path " ] ,
height = frame_info [ " height " ] ,
width = frame_info [ " width " ] ,
labeled_shapes = [ ] ,
tags = [ ] ,
)
return frames [ ( frame_info [ " subset " ] , abs_frame ) ]
if include_empty :
for ident in self . _frame_info :
get_frame ( * ident )
for task in self . _db_tasks . values ( ) :
anno_manager = AnnotationManager ( self . _annotation_irs [ task . id ] )
for shape in sorted ( anno_manager . to_shapes ( task . data . size ) ,
key = lambda shape : shape . get ( " z_order " , 0 ) ) :
if ( task . id , shape [ ' frame ' ] ) not in self . _frame_info :
continue
if ' track_id ' in shape :
if shape [ ' outside ' ] :
continue
exported_shape = self . _export_tracked_shape ( shape , task . id )
else :
exported_shape = self . _export_labeled_shape ( shape , task . id )
get_frame ( task . id , shape [ ' frame ' ] ) . labeled_shapes . append ( exported_shape )
for tag in self . _annotation_irs [ task . id ] . tags :
get_frame ( task . id , tag [ ' frame ' ] ) . tags . append ( self . _export_tag ( tag , task . id ) )
return iter ( frames . values ( ) )
@property
def shapes ( self ) :
for task in self . _db_tasks . values ( ) :
for shape in self . _annotation_irs [ task . id ] . shapes :
yield self . _export_labeled_shape ( shape , task . id )
@property
def tracks ( self ) :
idx = 0
for task in self . _db_tasks . values ( ) :
for track in self . _annotation_irs [ task . id ] . tracks :
tracked_shapes = TrackManager . get_interpolated_shapes (
track , 0 , task . data . size
)
for tracked_shape in tracked_shapes :
tracked_shape [ " attributes " ] + = track [ " attributes " ]
tracked_shape [ " track_id " ] = idx
tracked_shape [ " group " ] = track [ " group " ]
tracked_shape [ " source " ] = track [ " source " ]
tracked_shape [ " label_id " ] = track [ " label_id " ]
yield ProjectData . Track (
label = self . _get_label_name ( track [ " label_id " ] ) ,
group = track [ " group " ] ,
source = track [ " source " ] ,
shapes = [ self . _export_tracked_shape ( shape , task . id )
for shape in tracked_shapes ] ,
task_id = task . id
)
idx + = 1
@property
def tags ( self ) :
for task in self . _db_tasks . values ( ) :
for tag in self . _annotation_irs [ task . id ] . tags :
yield self . _export_tag ( tag , task . id )
@property
def meta ( self ) :
return self . _meta
@property
def data ( self ) :
raise NotImplementedError ( )
@property
def frame_info ( self ) :
return self . _frame_info
@property
def frame_step ( self ) :
return self . _frame_steps
@property
def db_project ( self ) :
return self . _db_project
@property
def subsets ( self ) - > List [ str ] :
return self . _subsets
@property
def tasks ( self ) :
return list ( self . _db_tasks . values ( ) )
@property
def task_data ( self ) :
for task_id , task in self . _db_tasks . items ( ) :
yield TaskData ( self . _annotation_irs [ task_id ] , task , self . _host )
@staticmethod
def _get_filename ( path ) :
return osp . splitext ( path ) [ 0 ]
class CVATDataExtractorMixin :
def __init__ ( self ) :
super ( ) . __init__ ( )
def categories ( self ) - > dict :
raise NotImplementedError ( )
@staticmethod
def _load_categories ( labels : list ) :
categories : Dict [ datumaro . AnnotationType , datumaro . Categories ] = { }
label_categories = datumaro . LabelCategories ( attributes = [ ' occluded ' ] )
for _ , label in labels :
label_categories . add ( label [ ' name ' ] )
for _ , attr in label [ ' attributes ' ] :
label_categories . attributes . add ( attr [ ' name ' ] )
categories [ datumaro . AnnotationType . label ] = label_categories
return categories
def _read_cvat_anno ( self , cvat_frame_anno : Union [ ProjectData . Frame , TaskData . Frame ] , labels : list ) :
categories = self . categories ( )
label_cat = categories [ datumaro . AnnotationType . label ]
def map_label ( name ) : return label_cat . find ( name ) [ 0 ]
label_attrs = {
label [ ' name ' ] : label [ ' attributes ' ]
for _ , label in labels
}
return convert_cvat_anno_to_dm ( cvat_frame_anno , label_attrs , map_label )
class CvatTaskDataExtractor ( datumaro . SourceExtractor , CVATDataExtractorMixin ) :
def __init__ ( self , task_data , include_images = False , format_type = None , dimension = DimensionType . DIM_2D ) :
super ( ) . __init__ ( )
self . _categories , self . _user = self . _load_categories ( task_data , dimension = dimension )
@ -537,12 +879,13 @@ class CvatTaskDataExtractor(datumaro.SourceExtractor):
dm_image = _make_image ( frame_data . idx , * * image_args )
else :
dm_image = Image ( * * image_args )
dm_anno = self . _read_cvat_anno ( frame_data , task_data )
dm_anno = self . _read_cvat_anno ( frame_data , task_data . meta [ ' task ' ] [ ' labels ' ] )
if dimension == DimensionType . DIM_2D :
dm_item = datumaro . DatasetItem ( id = osp . splitext ( frame_data . name ) [ 0 ] ,
annotations = dm_anno , image = dm_image ,
attributes = { ' frame ' : frame_data . frame } )
annotations = dm_anno , image = dm_image ,
attributes = { ' frame ' : frame_data . frame
} )
elif dimension == DimensionType . DIM_3D :
attributes = { ' frame ' : frame_data . frame }
if format_type == " sly_pointcloud " :
@ -564,18 +907,8 @@ class CvatTaskDataExtractor(datumaro.SourceExtractor):
self . _items = dm_items
def __iter__ ( self ) :
for item in self . _items :
yield item
def __len__ ( self ) :
return len ( self . _items )
def categories ( self ) :
return self . _categories
@staticmethod
def _load_categories ( cvat_anno , dimension ) :
def _load_categories ( cvat_anno , dimension ) : # pylint: disable=arguments-differ
categories = { }
label_categories = datumaro . LabelCategories ( attributes = [ ' occluded ' ] )
@ -595,102 +928,209 @@ class CvatTaskDataExtractor(datumaro.SourceExtractor):
return categories , user_info
def _read_cvat_anno ( self , cvat_frame_anno , task_data ) :
item_anno = [ ]
def _read_cvat_anno ( self , cvat_frame_anno : TaskData . Frame , labels : list ) :
categories = self . categories ( )
label_cat = categories [ datumaro . AnnotationType . label ]
def map_label ( name ) : return label_cat . find ( name ) [ 0 ]
label_attrs = {
label [ ' name ' ] : label [ ' attributes ' ]
for _ , label in task_data. meta [ ' task ' ] [ ' labels' ]
for _ , label in labels
}
def convert_attrs ( label , cvat_attrs ) :
cvat_attrs = { a . name : a . value for a in cvat_attrs }
dm_attr = dict ( )
for _ , a_desc in label_attrs [ label ] :
a_name = a_desc [ ' name ' ]
a_value = cvat_attrs . get ( a_name , a_desc [ ' default_value ' ] )
try :
if a_desc [ ' input_type ' ] == AttributeType . NUMBER :
a_value = float ( a_value )
elif a_desc [ ' input_type ' ] == AttributeType . CHECKBOX :
a_value = ( a_value . lower ( ) == ' true ' )
dm_attr [ a_name ] = a_value
except Exception as e :
raise Exception (
" Failed to convert attribute ' %s ' = ' %s ' : %s " %
( a_name , a_value , e ) )
if self . _format_type == " sly_pointcloud " and ( a_desc . get ( ' input_type ' ) == ' select ' or a_desc . get ( ' input_type ' ) == ' radio ' ) :
dm_attr [ f " { a_name } __values " ] = a_desc [ " values " ]
return dm_attr
for tag_obj in cvat_frame_anno . tags :
anno_group = tag_obj . group or 0
anno_label = map_label ( tag_obj . label )
anno_attr = convert_attrs ( tag_obj . label , tag_obj . attributes )
anno = datumaro . Label ( label = anno_label ,
attributes = anno_attr , group = anno_group )
item_anno . append ( anno )
shapes = [ ]
for shape in cvat_frame_anno . shapes :
shapes . append ( { " id " : shape . id , " label_id " : shape . label_id } )
return convert_cvat_anno_to_dm ( cvat_frame_anno , label_attrs , map_label , self . _format_type , self . _dimension )
for index , shape_obj in enumerate ( cvat_frame_anno . labeled_shapes ) :
anno_group = shape_obj . group or 0
anno_label = map_label ( shape_obj . label )
anno_attr = convert_attrs ( shape_obj . label , shape_obj . attributes )
anno_attr [ ' occluded ' ] = shape_obj . occluded
if hasattr ( shape_obj , ' track_id ' ) :
anno_attr [ ' track_id ' ] = shape_obj . track_id
anno_attr [ ' keyframe ' ] = shape_obj . keyframe
anno_points = shape_obj . points
if shape_obj . type == ShapeType . POINTS :
anno = datumaro . Points ( anno_points ,
label = anno_label , attributes = anno_attr , group = anno_group ,
z_order = shape_obj . z_order )
elif shape_obj . type == ShapeType . POLYLINE :
anno = datumaro . PolyLine ( anno_points ,
label = anno_label , attributes = anno_attr , group = anno_group ,
z_order = shape_obj . z_order )
elif shape_obj . type == ShapeType . POLYGON :
anno = datumaro . Polygon ( anno_points ,
label = anno_label , attributes = anno_attr , group = anno_group ,
z_order = shape_obj . z_order )
elif shape_obj . type == ShapeType . RECTANGLE :
x0 , y0 , x1 , y1 = anno_points
anno = datumaro . Bbox ( x0 , y0 , x1 - x0 , y1 - y0 ,
label = anno_label , attributes = anno_attr , group = anno_group ,
z_order = shape_obj . z_order )
elif shape_obj . type == ShapeType . CUBOID :
if self . _dimension == DimensionType . DIM_3D :
if self . _format_type == " sly_pointcloud " :
anno_id = shapes [ index ] [ " id " ]
anno_attr [ " label_id " ] = shapes [ index ] [ " label_id " ]
else :
anno_id = index
position , rotation , scale = anno_points [ 0 : 3 ] , anno_points [ 3 : 6 ] , anno_points [ 6 : 9 ]
anno = datumaro . Cuboid3d ( id = anno_id , position = position , rotation = rotation , scale = scale ,
label = anno_label , attributes = anno_attr , group = anno_group
)
class CVATProjectDataExtractor ( datumaro . Extractor , CVATDataExtractorMixin ) :
def __init__ ( self , project_data : ProjectData , include_images : bool = False ) :
super ( ) . __init__ ( )
self . _categories = self . _load_categories ( project_data . meta [ ' project ' ] [ ' labels ' ] )
dm_items : List [ datumaro . DatasetItem ] = [ ]
ext_per_task : Dict [ int , str ] = { }
image_maker_per_task : Dict [ int , Callable ] = { }
for task in project_data . tasks :
is_video = task . mode == ' interpolation '
ext_per_task [ task . id ] = FrameProvider . VIDEO_FRAME_EXT if is_video else ' '
if include_images :
frame_provider = FrameProvider ( task . data )
if is_video :
# optimization for videos: use numpy arrays instead of bytes
# some formats or transforms can require image data
def image_maker_factory ( frame_provider ) :
def _make_image ( i , * * kwargs ) :
loader = lambda _ : frame_provider . get_frame ( i ,
quality = frame_provider . Quality . ORIGINAL ,
out_type = frame_provider . Type . NUMPY_ARRAY ) [ 0 ]
return Image ( loader = loader , * * kwargs )
return _make_image
else :
continue
# for images use encoded data to avoid recoding
def image_maker_factory ( frame_provider ) :
def _make_image ( i , * * kwargs ) :
loader = lambda _ : frame_provider . get_frame ( i ,
quality = frame_provider . Quality . ORIGINAL ,
out_type = frame_provider . Type . BUFFER ) [ 0 ] . getvalue ( )
return ByteImage ( data = loader , * * kwargs )
return _make_image
image_maker_per_task [ task . id ] = image_maker_factory ( frame_provider )
for frame_data in project_data . group_by_frame ( include_empty = True ) :
image_args = {
' path ' : frame_data . name + ext_per_task [ frame_data . task_id ] ,
' size ' : ( frame_data . height , frame_data . width ) ,
}
if include_images :
dm_image = image_maker_per_task [ frame_data . task_id ] ( frame_data . idx , * * image_args )
else :
raise Exception ( " Unknown shape type ' %s ' " % shape_obj . type )
dm_image = Image ( * * image_args )
dm_anno = self . _read_cvat_anno ( frame_data , project_data . meta [ ' project ' ] [ ' labels ' ] )
dm_item = datumaro . DatasetItem ( id = osp . splitext ( frame_data . name ) [ 0 ] ,
annotations = dm_anno , image = dm_image ,
subset = frame_data . subset ,
attributes = { ' frame ' : frame_data . frame }
)
dm_items . append ( dm_item )
item_anno . append ( anno )
self . _items = dm_items
return item_anno
def categories ( self ) :
return self . _categories
def __iter__ ( self ) :
yield from self . _items
def __len__ ( self ) :
return len ( self . _items )
def GetCVATDataExtractor ( instance_data : Union [ ProjectData , TaskData ] , include_images : bool = False ) :
if isinstance ( instance_data , ProjectData ) :
return CVATProjectDataExtractor ( instance_data , include_images )
else :
return CvatTaskDataExtractor ( instance_data , include_images )
class CvatImportError ( Exception ) :
pass
def mangle_image_name ( name : str , subset : str , names : DefaultDict [ Tuple [ str , str ] , int ] ) - > str :
name , ext = name . rsplit ( osp . extsep , maxsplit = 1 )
if not names [ ( subset , name ) ] :
names [ ( subset , name ) ] + = 1
return osp . extsep . join ( [ name , ext ] )
else :
image_name = f " { name } _ { names [ ( subset , name ) ] } "
if not names [ ( subset , image_name ) ] :
names [ ( subset , name ) ] + = 1
return osp . extsep . join ( [ image_name , ext ] )
else :
i = 1
while i < sys . maxsize :
new_image_name = f " { image_name } _ { i } "
if not names [ ( subset , new_image_name ) ] :
names [ ( subset , name ) ] + = 1
return osp . extsep . join ( [ new_image_name , ext ] )
i + = 1
raise Exception ( ' Cannot mangle image name ' )
def get_defaulted_subset ( subset : str , subsets : List [ str ] ) - > str :
if subset :
return subset
else :
if datumaro . DEFAULT_SUBSET_NAME not in subsets :
return datumaro . DEFAULT_SUBSET_NAME
else :
i = 1
while i < sys . maxsize :
if f ' { datumaro . DEFAULT_SUBSET_NAME } _ { i } ' not in subsets :
return f ' { datumaro . DEFAULT_SUBSET_NAME } _ { i } '
i + = 1
raise Exception ( ' Cannot find default name for subset ' )
def convert_cvat_anno_to_dm ( cvat_frame_anno , label_attrs , map_label , format_name = None , dimension = DimensionType . DIM_2D ) :
item_anno = [ ]
def convert_attrs ( label , cvat_attrs ) :
cvat_attrs = { a . name : a . value for a in cvat_attrs }
dm_attr = dict ( )
for _ , a_desc in label_attrs [ label ] :
a_name = a_desc [ ' name ' ]
a_value = cvat_attrs . get ( a_name , a_desc [ ' default_value ' ] )
try :
if a_desc [ ' input_type ' ] == AttributeType . NUMBER :
a_value = float ( a_value )
elif a_desc [ ' input_type ' ] == AttributeType . CHECKBOX :
a_value = ( a_value . lower ( ) == ' true ' )
dm_attr [ a_name ] = a_value
except Exception as e :
raise Exception (
" Failed to convert attribute ' %s ' = ' %s ' : %s " %
( a_name , a_value , e ) )
return dm_attr
for tag_obj in cvat_frame_anno . tags :
anno_group = tag_obj . group or 0
anno_label = map_label ( tag_obj . label )
anno_attr = convert_attrs ( tag_obj . label , tag_obj . attributes )
anno = datumaro . Label ( label = anno_label ,
attributes = anno_attr , group = anno_group )
item_anno . append ( anno )
shapes = [ ]
if hasattr ( cvat_frame_anno , ' shapes ' ) :
for shape in cvat_frame_anno . shapes :
shapes . append ( { " id " : shape . id , " label_id " : shape . label_id } )
for index , shape_obj in enumerate ( cvat_frame_anno . labeled_shapes ) :
anno_group = shape_obj . group or 0
anno_label = map_label ( shape_obj . label )
anno_attr = convert_attrs ( shape_obj . label , shape_obj . attributes )
anno_attr [ ' occluded ' ] = shape_obj . occluded
if hasattr ( shape_obj , ' track_id ' ) :
anno_attr [ ' track_id ' ] = shape_obj . track_id
anno_attr [ ' keyframe ' ] = shape_obj . keyframe
anno_points = shape_obj . points
if shape_obj . type == ShapeType . POINTS :
anno = datumaro . Points ( anno_points ,
label = anno_label , attributes = anno_attr , group = anno_group ,
z_order = shape_obj . z_order )
elif shape_obj . type == ShapeType . POLYLINE :
anno = datumaro . PolyLine ( anno_points ,
label = anno_label , attributes = anno_attr , group = anno_group ,
z_order = shape_obj . z_order )
elif shape_obj . type == ShapeType . POLYGON :
anno = datumaro . Polygon ( anno_points ,
label = anno_label , attributes = anno_attr , group = anno_group ,
z_order = shape_obj . z_order )
elif shape_obj . type == ShapeType . RECTANGLE :
x0 , y0 , x1 , y1 = anno_points
anno = datumaro . Bbox ( x0 , y0 , x1 - x0 , y1 - y0 ,
label = anno_label , attributes = anno_attr , group = anno_group ,
z_order = shape_obj . z_order )
elif shape_obj . type == ShapeType . CUBOID :
if dimension == DimensionType . DIM_3D :
if format_name == " sly_pointcloud " :
anno_id = shapes [ index ] [ " id " ]
else :
anno_id = index
position , rotation , scale = anno_points [ 0 : 3 ] , anno_points [ 3 : 6 ] , anno_points [ 6 : 9 ]
anno = datumaro . Cuboid3d ( id = anno_id , position = position , rotation = rotation , scale = scale ,
label = anno_label , attributes = anno_attr , group = anno_group
)
else :
continue
else :
raise Exception ( " Unknown shape type ' %s ' " % shape_obj . type )
item_anno . append ( anno )
return item_anno
def match_dm_item ( item , task_data , root_hint = None ) :
is_video = task_data . meta [ ' task ' ] [ ' mode ' ] == ' interpolation '