|
|
|
|
@ -4,38 +4,27 @@
|
|
|
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
|
|
|
|
|
|
from lxml import etree as ET # NOTE: lxml has proper XPath implementation
|
|
|
|
|
from datumaro.components.extractor import (DatasetItem, Annotation,
|
|
|
|
|
from datumaro.components.extractor import (DatasetItem, Extractor,
|
|
|
|
|
Annotation, AnnotationType,
|
|
|
|
|
LabelObject, MaskObject, PointsObject, PolygonObject,
|
|
|
|
|
PolyLineObject, BboxObject, CaptionObject,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _cast(value, type_conv, default=None):
|
|
|
|
|
if value is None:
|
|
|
|
|
return default
|
|
|
|
|
try:
|
|
|
|
|
return type_conv(value)
|
|
|
|
|
except Exception:
|
|
|
|
|
return default
|
|
|
|
|
|
|
|
|
|
class DatasetItemEncoder:
|
|
|
|
|
def encode_item(self, item):
|
|
|
|
|
@classmethod
|
|
|
|
|
def encode(cls, item, categories=None):
|
|
|
|
|
item_elem = ET.Element('item')
|
|
|
|
|
ET.SubElement(item_elem, 'id').text = str(item.id)
|
|
|
|
|
ET.SubElement(item_elem, 'subset').text = str(item.subset)
|
|
|
|
|
|
|
|
|
|
# Dataset wrapper-specific
|
|
|
|
|
ET.SubElement(item_elem, 'source').text = \
|
|
|
|
|
str(getattr(item, 'source', None))
|
|
|
|
|
ET.SubElement(item_elem, 'extractor').text = \
|
|
|
|
|
str(getattr(item, 'extractor', None))
|
|
|
|
|
ET.SubElement(item_elem, 'path').text = str('/'.join(item.path))
|
|
|
|
|
|
|
|
|
|
image = item.image
|
|
|
|
|
if image is not None:
|
|
|
|
|
item_elem.append(self.encode_image(image))
|
|
|
|
|
item_elem.append(cls.encode_image(image))
|
|
|
|
|
|
|
|
|
|
for ann in item.annotations:
|
|
|
|
|
item_elem.append(self.encode_object(ann))
|
|
|
|
|
item_elem.append(cls.encode_annotation(ann, categories))
|
|
|
|
|
|
|
|
|
|
return item_elem
|
|
|
|
|
|
|
|
|
|
@ -52,7 +41,7 @@ class DatasetItemEncoder:
|
|
|
|
|
return image_elem
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def encode_annotation(cls, annotation):
|
|
|
|
|
def encode_annotation_base(cls, annotation):
|
|
|
|
|
assert isinstance(annotation, Annotation)
|
|
|
|
|
ann_elem = ET.Element('annotation')
|
|
|
|
|
ET.SubElement(ann_elem, 'id').text = str(annotation.id)
|
|
|
|
|
@ -65,18 +54,31 @@ class DatasetItemEncoder:
|
|
|
|
|
|
|
|
|
|
return ann_elem
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _get_label(label_id, categories):
|
|
|
|
|
label = ''
|
|
|
|
|
if categories is not None:
|
|
|
|
|
label_cat = categories.get(AnnotationType.label)
|
|
|
|
|
if label_cat is not None:
|
|
|
|
|
label = label_cat.items[label_id].name
|
|
|
|
|
return label
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def encode_label_object(cls, obj):
|
|
|
|
|
ann_elem = cls.encode_annotation(obj)
|
|
|
|
|
def encode_label_object(cls, obj, categories):
|
|
|
|
|
ann_elem = cls.encode_annotation_base(obj)
|
|
|
|
|
|
|
|
|
|
ET.SubElement(ann_elem, 'label').text = \
|
|
|
|
|
str(cls._get_label(obj.label, categories))
|
|
|
|
|
ET.SubElement(ann_elem, 'label_id').text = str(obj.label)
|
|
|
|
|
|
|
|
|
|
return ann_elem
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def encode_mask_object(cls, obj):
|
|
|
|
|
ann_elem = cls.encode_annotation(obj)
|
|
|
|
|
def encode_mask_object(cls, obj, categories):
|
|
|
|
|
ann_elem = cls.encode_annotation_base(obj)
|
|
|
|
|
|
|
|
|
|
ET.SubElement(ann_elem, 'label').text = \
|
|
|
|
|
str(cls._get_label(obj.label, categories))
|
|
|
|
|
ET.SubElement(ann_elem, 'label_id').text = str(obj.label)
|
|
|
|
|
|
|
|
|
|
mask = obj.image
|
|
|
|
|
@ -86,9 +88,11 @@ class DatasetItemEncoder:
|
|
|
|
|
return ann_elem
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def encode_bbox_object(cls, obj):
|
|
|
|
|
ann_elem = cls.encode_annotation(obj)
|
|
|
|
|
def encode_bbox_object(cls, obj, categories):
|
|
|
|
|
ann_elem = cls.encode_annotation_base(obj)
|
|
|
|
|
|
|
|
|
|
ET.SubElement(ann_elem, 'label').text = \
|
|
|
|
|
str(cls._get_label(obj.label, categories))
|
|
|
|
|
ET.SubElement(ann_elem, 'label_id').text = str(obj.label)
|
|
|
|
|
ET.SubElement(ann_elem, 'x').text = str(obj.x)
|
|
|
|
|
ET.SubElement(ann_elem, 'y').text = str(obj.y)
|
|
|
|
|
@ -99,9 +103,11 @@ class DatasetItemEncoder:
|
|
|
|
|
return ann_elem
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def encode_points_object(cls, obj):
|
|
|
|
|
ann_elem = cls.encode_annotation(obj)
|
|
|
|
|
def encode_points_object(cls, obj, categories):
|
|
|
|
|
ann_elem = cls.encode_annotation_base(obj)
|
|
|
|
|
|
|
|
|
|
ET.SubElement(ann_elem, 'label').text = \
|
|
|
|
|
str(cls._get_label(obj.label, categories))
|
|
|
|
|
ET.SubElement(ann_elem, 'label_id').text = str(obj.label)
|
|
|
|
|
|
|
|
|
|
x, y, w, h = obj.get_bbox()
|
|
|
|
|
@ -113,20 +119,22 @@ class DatasetItemEncoder:
|
|
|
|
|
ET.SubElement(bbox_elem, 'h').text = str(h)
|
|
|
|
|
ET.SubElement(bbox_elem, 'area').text = str(area)
|
|
|
|
|
|
|
|
|
|
points = ann_elem.points
|
|
|
|
|
points = obj.points
|
|
|
|
|
for i in range(0, len(points), 2):
|
|
|
|
|
point_elem = ET.SubElement(ann_elem, 'point')
|
|
|
|
|
ET.SubElement(point_elem, 'x').text = str(points[i * 2])
|
|
|
|
|
ET.SubElement(point_elem, 'y').text = str(points[i * 2 + 1])
|
|
|
|
|
ET.SubElement(point_elem, 'x').text = str(points[i])
|
|
|
|
|
ET.SubElement(point_elem, 'y').text = str(points[i + 1])
|
|
|
|
|
ET.SubElement(point_elem, 'visible').text = \
|
|
|
|
|
str(ann_elem.visibility[i // 2].name)
|
|
|
|
|
str(obj.visibility[i // 2].name)
|
|
|
|
|
|
|
|
|
|
return ann_elem
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def encode_polyline_object(cls, obj):
|
|
|
|
|
ann_elem = cls.encode_annotation(obj)
|
|
|
|
|
def encode_polygon_object(cls, obj, categories):
|
|
|
|
|
ann_elem = cls.encode_annotation_base(obj)
|
|
|
|
|
|
|
|
|
|
ET.SubElement(ann_elem, 'label').text = \
|
|
|
|
|
str(cls._get_label(obj.label, categories))
|
|
|
|
|
ET.SubElement(ann_elem, 'label_id').text = str(obj.label)
|
|
|
|
|
|
|
|
|
|
x, y, w, h = obj.get_bbox()
|
|
|
|
|
@ -138,57 +146,142 @@ class DatasetItemEncoder:
|
|
|
|
|
ET.SubElement(bbox_elem, 'h').text = str(h)
|
|
|
|
|
ET.SubElement(bbox_elem, 'area').text = str(area)
|
|
|
|
|
|
|
|
|
|
points = ann_elem.points
|
|
|
|
|
points = obj.points
|
|
|
|
|
for i in range(0, len(points), 2):
|
|
|
|
|
point_elem = ET.SubElement(ann_elem, 'point')
|
|
|
|
|
ET.SubElement(point_elem, 'x').text = str(points[i * 2])
|
|
|
|
|
ET.SubElement(point_elem, 'y').text = str(points[i * 2 + 1])
|
|
|
|
|
ET.SubElement(point_elem, 'x').text = str(points[i])
|
|
|
|
|
ET.SubElement(point_elem, 'y').text = str(points[i + 1])
|
|
|
|
|
|
|
|
|
|
return ann_elem
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def encode_polyline_object(cls, obj, categories):
|
|
|
|
|
ann_elem = cls.encode_annotation_base(obj)
|
|
|
|
|
|
|
|
|
|
ET.SubElement(ann_elem, 'label').text = \
|
|
|
|
|
str(cls._get_label(obj.label, categories))
|
|
|
|
|
ET.SubElement(ann_elem, 'label_id').text = str(obj.label)
|
|
|
|
|
|
|
|
|
|
x, y, w, h = obj.get_bbox()
|
|
|
|
|
area = w * h
|
|
|
|
|
bbox_elem = ET.SubElement(ann_elem, 'bbox')
|
|
|
|
|
ET.SubElement(bbox_elem, 'x').text = str(x)
|
|
|
|
|
ET.SubElement(bbox_elem, 'y').text = str(y)
|
|
|
|
|
ET.SubElement(bbox_elem, 'w').text = str(w)
|
|
|
|
|
ET.SubElement(bbox_elem, 'h').text = str(h)
|
|
|
|
|
ET.SubElement(bbox_elem, 'area').text = str(area)
|
|
|
|
|
|
|
|
|
|
points = obj.points
|
|
|
|
|
for i in range(0, len(points), 2):
|
|
|
|
|
point_elem = ET.SubElement(ann_elem, 'point')
|
|
|
|
|
ET.SubElement(point_elem, 'x').text = str(points[i])
|
|
|
|
|
ET.SubElement(point_elem, 'y').text = str(points[i + 1])
|
|
|
|
|
|
|
|
|
|
return ann_elem
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def encode_caption_object(cls, obj):
|
|
|
|
|
ann_elem = cls.encode_annotation(obj)
|
|
|
|
|
ann_elem = cls.encode_annotation_base(obj)
|
|
|
|
|
|
|
|
|
|
ET.SubElement(ann_elem, 'caption').text = str(obj.caption)
|
|
|
|
|
|
|
|
|
|
return ann_elem
|
|
|
|
|
|
|
|
|
|
def encode_object(self, o):
|
|
|
|
|
@classmethod
|
|
|
|
|
def encode_annotation(cls, o, categories=None):
|
|
|
|
|
if isinstance(o, LabelObject):
|
|
|
|
|
return self.encode_label_object(o)
|
|
|
|
|
return cls.encode_label_object(o, categories)
|
|
|
|
|
if isinstance(o, MaskObject):
|
|
|
|
|
return self.encode_mask_object(o)
|
|
|
|
|
return cls.encode_mask_object(o, categories)
|
|
|
|
|
if isinstance(o, BboxObject):
|
|
|
|
|
return self.encode_bbox_object(o)
|
|
|
|
|
return cls.encode_bbox_object(o, categories)
|
|
|
|
|
if isinstance(o, PointsObject):
|
|
|
|
|
return self.encode_points_object(o)
|
|
|
|
|
return cls.encode_points_object(o, categories)
|
|
|
|
|
if isinstance(o, PolyLineObject):
|
|
|
|
|
return self.encode_polyline_object(o)
|
|
|
|
|
return cls.encode_polyline_object(o, categories)
|
|
|
|
|
if isinstance(o, PolygonObject):
|
|
|
|
|
return self.encode_polygon_object(o)
|
|
|
|
|
return cls.encode_polygon_object(o, categories)
|
|
|
|
|
if isinstance(o, CaptionObject):
|
|
|
|
|
return self.encode_caption_object(o)
|
|
|
|
|
if isinstance(o, Annotation): # keep after derived classes
|
|
|
|
|
return self.encode_annotation(o)
|
|
|
|
|
return cls.encode_caption_object(o)
|
|
|
|
|
raise NotImplementedError("Unexpected annotation object passed: %s" % o)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def to_string(encoded_item):
|
|
|
|
|
return ET.tostring(encoded_item, encoding='unicode', pretty_print=True)
|
|
|
|
|
|
|
|
|
|
def XPathDatasetFilter(extractor, xpath=None):
|
|
|
|
|
if xpath is None:
|
|
|
|
|
return extractor
|
|
|
|
|
xpath = ET.XPath(xpath)
|
|
|
|
|
f = lambda item: bool(xpath(
|
|
|
|
|
DatasetItemEncoder.encode(item, extractor.categories())))
|
|
|
|
|
return extractor.select(f)
|
|
|
|
|
|
|
|
|
|
class XPathAnnotationsFilter(Extractor): # NOTE: essentially, a transform
|
|
|
|
|
class ItemWrapper(DatasetItem):
|
|
|
|
|
def __init__(self, item, annotations):
|
|
|
|
|
self._item = item
|
|
|
|
|
self._annotations = annotations
|
|
|
|
|
|
|
|
|
|
@DatasetItem.id.getter
|
|
|
|
|
def id(self):
|
|
|
|
|
return self._item.id
|
|
|
|
|
|
|
|
|
|
@DatasetItem.subset.getter
|
|
|
|
|
def subset(self):
|
|
|
|
|
return self._item.subset
|
|
|
|
|
|
|
|
|
|
if isinstance(o, DatasetItem):
|
|
|
|
|
return self.encode_item(o)
|
|
|
|
|
@DatasetItem.path.getter
|
|
|
|
|
def path(self):
|
|
|
|
|
return self._item.path
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
@DatasetItem.annotations.getter
|
|
|
|
|
def annotations(self):
|
|
|
|
|
return self._annotations
|
|
|
|
|
|
|
|
|
|
class XPathDatasetFilter:
|
|
|
|
|
def __init__(self, filter_text=None):
|
|
|
|
|
self._filter = None
|
|
|
|
|
if filter_text is not None:
|
|
|
|
|
self._filter = ET.XPath(filter_text)
|
|
|
|
|
self._encoder = DatasetItemEncoder()
|
|
|
|
|
@DatasetItem.has_image.getter
|
|
|
|
|
def has_image(self):
|
|
|
|
|
return self._item.has_image
|
|
|
|
|
|
|
|
|
|
def __call__(self, item):
|
|
|
|
|
encoded_item = self._serialize_item(item)
|
|
|
|
|
@DatasetItem.image.getter
|
|
|
|
|
def image(self):
|
|
|
|
|
return self._item.image
|
|
|
|
|
|
|
|
|
|
def __init__(self, extractor, xpath=None, remove_empty=False):
|
|
|
|
|
super().__init__()
|
|
|
|
|
self._extractor = extractor
|
|
|
|
|
|
|
|
|
|
if xpath is not None:
|
|
|
|
|
xpath = ET.XPath(xpath)
|
|
|
|
|
self._filter = xpath
|
|
|
|
|
|
|
|
|
|
self._remove_empty = remove_empty
|
|
|
|
|
|
|
|
|
|
def __len__(self):
|
|
|
|
|
return len(self._extractor)
|
|
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
|
for item in self._extractor:
|
|
|
|
|
item = self._filter_item(item)
|
|
|
|
|
if item is not None:
|
|
|
|
|
yield item
|
|
|
|
|
|
|
|
|
|
def subsets(self):
|
|
|
|
|
return self._extractor.subsets()
|
|
|
|
|
|
|
|
|
|
def categories(self):
|
|
|
|
|
return self._extractor.categories()
|
|
|
|
|
|
|
|
|
|
def _filter_item(self, item):
|
|
|
|
|
if self._filter is None:
|
|
|
|
|
return True
|
|
|
|
|
return bool(self._filter(encoded_item))
|
|
|
|
|
return item
|
|
|
|
|
encoded = DatasetItemEncoder.encode(item, self._extractor.categories())
|
|
|
|
|
filtered = self._filter(encoded)
|
|
|
|
|
filtered = [elem for elem in filtered if elem.tag == 'annotation']
|
|
|
|
|
|
|
|
|
|
encoded = encoded.findall('annotation')
|
|
|
|
|
annotations = [item.annotations[encoded.index(e)] for e in filtered]
|
|
|
|
|
|
|
|
|
|
def _serialize_item(self, item):
|
|
|
|
|
return self._encoder.encode_item(item)
|
|
|
|
|
if self._remove_empty and len(annotations) == 0:
|
|
|
|
|
return None
|
|
|
|
|
return self.ItemWrapper(item, annotations)
|