@ -4,18 +4,23 @@
# SPDX-License-Identifier: MIT
from collections import OrderedDict , defaultdict
from enum import Enum
from itertools import chain
import logging as log
from lxml import etree as ET
import os
import os . path as osp
from datumaro . components . converter import Converter
from datumaro . components . extractor import DEFAULT_SUBSET_NAME , AnnotationType
from datumaro . components . formats . voc import VocLabel , VocAction , \
VocBodyPart , VocPose , VocTask , VocPath , VocColormap , VocInstColormap
from datumaro . util import find
from datumaro . components . extractor import ( DEFAULT_SUBSET_NAME , AnnotationType ,
LabelCategories
)
from datumaro . components . formats . voc import ( VocTask , VocPath ,
VocInstColormap , VocPose ,
parse_label_map , make_voc_label_map , make_voc_categories , write_label_map
)
from datumaro . util . image import save_image
from datumaro . util . mask_tools import apply_colormap
from datumaro . util . mask_tools import apply_colormap , remap_mask
def _write_xml_bbox ( bbox , parent_elem ) :
@ -27,13 +32,12 @@ def _write_xml_bbox(bbox, parent_elem):
ET . SubElement ( bbox_elem , ' ymax ' ) . text = str ( y + h )
return bbox_elem
class _Converter :
_LABELS = set ( [ entry . name for entry in VocLabel ] )
_BODY_PARTS = set ( [ entry . name for entry in VocBodyPart ] )
_ACTIONS = set ( [ entry . name for entry in VocAction ] )
LabelmapType = Enum ( ' LabelmapType ' , [ ' voc ' , ' source ' , ' guess ' ] )
class _Converter :
def __init__ ( self , extractor , save_dir ,
tasks = None , apply_colormap = True , save_images = False ) :
tasks = None , apply_colormap = True , save_images = False , label_map = None ):
assert tasks is None or isinstance ( tasks , ( VocTask , list ) )
if tasks is None :
tasks = list ( VocTask )
@ -49,14 +53,12 @@ class _Converter:
self . _apply_colormap = apply_colormap
self . _save_images = save_images
self . _label_categories = extractor . categories ( ) \
. get ( AnnotationType . label )
self . _mask_categories = extractor . categories ( ) \
. get ( AnnotationType . mask )
self . _load_categories ( label_map )
def convert ( self ) :
self . init_dirs ( )
self . save_subsets ( )
self . save_label_map ( )
def init_dirs ( self ) :
save_dir = self . _save_dir
@ -94,7 +96,8 @@ class _Converter:
self . _images_dir = images_dir
def get_label ( self , label_id ) :
return self . _label_categories . items [ label_id ] . name
return self . _extractor . categories ( ) [ AnnotationType . label ] \
. items [ label_id ] . name
def save_subsets ( self ) :
subsets = self . _extractor . subsets ( )
@ -167,56 +170,64 @@ class _Converter:
layout_bboxes = [ ]
for bbox in bboxes :
label = self . get_label ( bbox . label )
if label in self . _LABELS :
main_bboxes . append ( bbox )
elif label in self . _BODY_PARTS :
if self . _is_part ( label ) :
layout_bboxes . append ( bbox )
elif self . _is_label ( label ) :
main_bboxes . append ( bbox )
for new_obj_id , obj in enumerate ( main_bboxes ) :
attr = obj . attributes
obj_elem = ET . SubElement ( root_elem , ' object ' )
ET . SubElement ( obj_elem , ' name ' ) . text = self . get_label ( obj . label )
obj_label = self . get_label ( obj . label )
ET . SubElement ( obj_elem , ' name ' ) . text = obj_label
pose = attr . get ( ' pose ' )
if pose is not None :
ET . SubElement ( obj_elem , ' pose ' ) . text = VocPose [ pose ] . name
pose = VocPose [ pose ]
else :
pose = VocPose . Unspecified
ET . SubElement ( obj_elem , ' pose ' ) . text = pose . name
truncated = attr . get ( ' truncated ' )
if truncated is not None :
truncated = int ( truncated )
else :
truncated = 0
ET . SubElement ( obj_elem , ' truncated ' ) . text = ' %d ' % truncated
difficult = attr . get ( ' difficult ' )
if difficult is not None :
difficult = int ( difficult )
else :
difficult = 0
ET . SubElement ( obj_elem , ' difficult ' ) . text = ' %d ' % difficult
bbox = obj . get_bbox ( )
if bbox is not None :
_write_xml_bbox ( bbox , obj_elem )
for part in VocBodyPart :
part_bbox = find ( layout_bboxes , lambda x : \
obj . id == x . group and \
self . get_label ( x . label ) == part . name )
if part_bbox is not None :
for part_bbox in filter ( lambda x : obj . id == x . group ,
layout_bboxes ) :
part_elem = ET . SubElement ( obj_elem , ' part ' )
ET . SubElement ( part_elem , ' name ' ) . text = part . name
ET . SubElement ( part_elem , ' name ' ) . text = \
self . get_label ( part_bbox . label )
_write_xml_bbox ( part_bbox . get_bbox ( ) , part_elem )
objects_with_parts . append ( new_obj_id )
actions = [ x for x in labels
if obj . id == x . group and \
self . get_label ( x . label ) in self . _ACTIONS ]
if len ( actions ) != 0 :
actions_elem = ET . SubElement ( obj_elem , ' actions ' )
for action in VocAction :
presented = find ( actions , lambda x : \
self . get_label ( x . label ) == action . name ) is not None
ET . SubElement ( actions_elem , action . name ) . text = \
actions = { k : v for k , v in obj . attributes . items ( )
if self . _is_action ( obj_label , k ) }
actions_elem = ET . Element ( ' actions ' )
for action in self . _get_actions ( obj_label ) :
presented = action in actions and actions [ action ]
ET . SubElement ( actions_elem , action ) . text = \
' %d ' % presented
objects_with_actions [ new_obj_id ] [ action ] = presented
if len ( actions ) != 0 :
obj_elem . append ( actions_elem )
if set ( self . _tasks ) & set ( [ None ,
VocTask . detection ,
@ -232,7 +243,7 @@ class _Converter:
for label_obj in labels :
label = self . get_label ( label_obj . label )
if label not in self . _LABELS :
if not self . _is_label ( label ) :
continue
class_list = class_lists . get ( item_id , set ( ) )
class_list . add ( label_obj . label )
@ -244,7 +255,7 @@ class _Converter:
if mask_obj . attributes . get ( ' class ' ) == True :
self . save_segm ( osp . join ( self . _segm_dir ,
item_id + VocPath . SEGM_EXT ) ,
mask_obj , self . _mask_categories . colormap )
mask_obj )
if mask_obj . attributes . get ( ' instances ' ) == True :
self . save_segm ( osp . join ( self . _inst_dir ,
item_id + VocPath . SEGM_EXT ) ,
@ -284,9 +295,11 @@ class _Converter:
if len ( action_list ) == 0 :
return
for action in VocAction :
all_actions = set ( chain ( * ( self . _get_actions ( l )
for l in self . _label_map ) ) )
for action in all_actions :
ann_file = osp . join ( self . _action_subsets_dir ,
' %s _ %s .txt ' % ( action . name , subset_name ) )
' %s _ %s .txt ' % ( action , subset_name ) )
with open ( ann_file , ' w ' ) as f :
for item , objs in action_list . items ( ) :
if not objs :
@ -302,23 +315,17 @@ class _Converter:
if len ( class_lists ) == 0 :
return
label_cat = self . _extractor . categories ( ) . get ( AnnotationType . label , None )
if not label_cat :
log . warn ( " Unable to save classification task lists "
" as source does not provide class labels. Skipped. " )
return
for label in VocLabel :
for label in self . _label_map :
ann_file = osp . join ( self . _cls_subsets_dir ,
' %s _ %s .txt ' % ( label . name , subset_name ) )
' %s _ %s .txt ' % ( label , subset_name ) )
with open ( ann_file , ' w ' ) as f :
for item , item_labels in class_lists . items ( ) :
if not item_labels :
continue
item_labels = [ label_cat . items [ l ] . name for l in item_labels ]
presented = label . name in item_labels
f. write ( ' %s % d \n ' % \
( item , 1 if presented else - 1 ) )
item_labels = [ self . _strip_label ( self . get_label ( l ) )
for l in item_labels ]
presented = label in item_labels
f . write ( ' %s % d\n ' % ( item , 1 if presented else - 1 ) )
def save_clsdet_lists ( self , subset_name , clsdet_list ) :
os . makedirs ( self . _cls_subsets_dir , exist_ok = True )
@ -348,17 +355,124 @@ class _Converter:
else :
f . write ( ' %s \n ' % ( item ) )
def save_segm ( self , path , annotation , colormap ) :
def save_segm ( self , path , annotation , colormap = None ) :
data = annotation . image
if self . _apply_colormap :
if colormap is None :
colormap = VocColormap
colormap = self . _categories [ AnnotationType . mask ] . colormap
data = self . _remap_mask ( data )
data = apply_colormap ( data , colormap )
save_image ( path , data )
def save_label_map ( self ) :
path = osp . join ( self . _save_dir , VocPath . LABELMAP_FILE )
write_label_map ( path , self . _label_map )
@staticmethod
def _strip_label ( label ) :
return label . lower ( ) . strip ( )
def _load_categories ( self , label_map_source = None ) :
if label_map_source == LabelmapType . voc . name :
# strictly use VOC default labelmap
label_map = make_voc_label_map ( )
elif label_map_source == LabelmapType . source . name :
# generate colormap from the input dataset
labels = self . _extractor . categories ( ) \
. get ( AnnotationType . label , LabelCategories ( ) )
label_map = OrderedDict (
( item . name , [ None , [ ] , [ ] ] ) for item in labels . items )
elif label_map_source in [ LabelmapType . guess . name , None ] :
# generate colormap for union of VOC and input dataset labels
label_map = make_voc_label_map ( )
rebuild_colormap = False
source_labels = self . _extractor . categories ( ) \
. get ( AnnotationType . label , LabelCategories ( ) )
for label in source_labels . items :
label_name = self . _strip_label ( label . name )
if label_name not in label_map :
rebuild_colormap = True
if label . attributes or label_name not in label_map :
label_map [ label_name ] = [ None , [ ] , label . attributes ]
if rebuild_colormap :
for item in label_map . values ( ) :
item [ 0 ] = None
elif isinstance ( label_map_source , dict ) :
label_map = label_map_source
elif isinstance ( label_map_source , str ) and osp . isfile ( label_map_source ) :
label_map = parse_label_map ( label_map_source )
else :
raise Exception ( " Wrong labelmap specified, "
" expected one of %s or a file path " % \
' , ' . join ( t . name for t in LabelmapType ) )
self . _categories = make_voc_categories ( label_map )
self . _label_map = label_map
colormap = self . _categories [ AnnotationType . mask ] . colormap
for label_id , color in colormap . items ( ) :
label_desc = label_map [
self . _categories [ AnnotationType . label ] . items [ label_id ] . name ]
label_desc [ 0 ] = color
self . _label_id_mapping = self . _make_label_id_map ( )
def _is_label ( self , s ) :
return self . _label_map . get ( self . _strip_label ( s ) ) is not None
def _is_part ( self , s ) :
s = self . _strip_label ( s )
for label_desc in self . _label_map . values ( ) :
if s in label_desc [ 1 ] :
return True
return False
def _is_action ( self , label , s ) :
return self . _strip_label ( s ) in self . _get_actions ( label )
def _get_actions ( self , label ) :
label_desc = self . _label_map . get ( self . _strip_label ( label ) )
if not label_desc :
return [ ]
return label_desc [ 2 ]
def _make_label_id_map ( self ) :
source_labels = {
id : label . name for id , label in
enumerate ( self . _extractor . categories ( ) [ AnnotationType . label ] . items )
}
target_labels = {
label . name : id for id , label in
enumerate ( self . _categories [ AnnotationType . label ] . items )
}
id_mapping = {
src_id : target_labels . get ( src_label , 0 )
for src_id , src_label in source_labels . items ( )
}
void_labels = [ src_label for src_id , src_label in source_labels . items ( )
if src_label not in target_labels ]
if void_labels :
log . warn ( " The following labels are remapped to background: %s " %
' , ' . join ( void_labels ) )
def map_id ( src_id ) :
return id_mapping [ src_id ]
return map_id
def _remap_mask ( self , mask ) :
return remap_mask ( mask , self . _label_id_mapping )
class VocConverter ( Converter ) :
def __init__ ( self ,
tasks = None , save_images = False , apply_colormap = False ,
tasks = None , save_images = False , apply_colormap = False , label_map = None ,
cmdline_args = None ) :
super ( ) . __init__ ( )
@ -366,6 +480,7 @@ class VocConverter(Converter):
' tasks ' : tasks ,
' save_images ' : save_images ,
' apply_colormap ' : apply_colormap ,
' label_map ' : label_map ,
}
if cmdline_args is not None :
@ -375,6 +490,12 @@ class VocConverter(Converter):
def _split_tasks_string ( s ) :
return [ VocTask [ i . strip ( ) ] for i in s . split ( ' , ' ) ]
@staticmethod
def _get_labelmap ( s ) :
if osp . isfile ( s ) :
return s
return LabelmapType [ s ] . name
@classmethod
def build_cmdline_parser ( cls , parser = None ) :
import argparse
@ -386,6 +507,9 @@ class VocConverter(Converter):
parser . add_argument ( ' --apply-colormap ' , type = bool , default = True ,
help = " Use colormap for class and instance masks "
" (default: %(default)s ) " )
parser . add_argument ( ' --label-map ' , type = cls . _get_labelmap , default = None ,
help = " Labelmap file path or one of %s " % \
' , ' . join ( t . name for t in LabelmapType ) )
parser . add_argument ( ' --tasks ' , type = cls . _split_tasks_string ,
default = None ,
help = " VOC task filter, comma-separated list of { %s } "