diff --git a/datumaro/datumaro/components/converters/__init__.py b/datumaro/datumaro/components/converters/__init__.py index a78ba4a7..e2f5e3d8 100644 --- a/datumaro/datumaro/components/converters/__init__.py +++ b/datumaro/datumaro/components/converters/__init__.py @@ -25,6 +25,7 @@ from datumaro.components.converters.voc import ( from datumaro.components.converters.yolo import YoloConverter from datumaro.components.converters.tfrecord import DetectionApiConverter +from datumaro.components.converters.cvat import CvatConverter items = [ @@ -47,4 +48,6 @@ items = [ ('yolo', YoloConverter), ('tf_detection_api', DetectionApiConverter), + + ('cvat', CvatConverter), ] diff --git a/datumaro/datumaro/components/converters/cvat.py b/datumaro/datumaro/components/converters/cvat.py new file mode 100644 index 00000000..242af837 --- /dev/null +++ b/datumaro/datumaro/components/converters/cvat.py @@ -0,0 +1,337 @@ + +# Copyright (C) 2019 Intel Corporation +# +# SPDX-License-Identifier: MIT + +from collections import OrderedDict +import os +import os.path as osp +from xml.sax.saxutils import XMLGenerator + +from datumaro.components.converter import Converter +from datumaro.components.extractor import DEFAULT_SUBSET_NAME, AnnotationType +from datumaro.components.formats.cvat import CvatPath +from datumaro.util.image import save_image + + +def pairwise(iterable): + a = iter(iterable) + return zip(a, a) + +class XmlAnnotationWriter: + VERSION = '1.1' + + def __init__(self, f): + self.xmlgen = XMLGenerator(f, 'utf-8') + self._level = 0 + + def _indent(self, newline = True): + if newline: + self.xmlgen.ignorableWhitespace('\n') + self.xmlgen.ignorableWhitespace(' ' * self._level) + + def _add_version(self): + self._indent() + self.xmlgen.startElement('version', {}) + self.xmlgen.characters(self.VERSION) + self.xmlgen.endElement('version') + + def open_root(self): + self.xmlgen.startDocument() + self.xmlgen.startElement('annotations', {}) + self._level += 1 + self._add_version() + + def _add_meta(self, meta): + self._level += 1 + for k, v in meta.items(): + if isinstance(v, OrderedDict): + self._indent() + self.xmlgen.startElement(k, {}) + self._add_meta(v) + self._indent() + self.xmlgen.endElement(k) + elif isinstance(v, list): + self._indent() + self.xmlgen.startElement(k, {}) + for tup in v: + self._add_meta(OrderedDict([tup])) + self._indent() + self.xmlgen.endElement(k) + else: + self._indent() + self.xmlgen.startElement(k, {}) + self.xmlgen.characters(v) + self.xmlgen.endElement(k) + self._level -= 1 + + def write_meta(self, meta): + self._indent() + self.xmlgen.startElement('meta', {}) + self._add_meta(meta) + self._indent() + self.xmlgen.endElement('meta') + + def open_track(self, track): + self._indent() + self.xmlgen.startElement('track', track) + self._level += 1 + + def open_image(self, image): + self._indent() + self.xmlgen.startElement('image', image) + self._level += 1 + + def open_box(self, box): + self._indent() + self.xmlgen.startElement('box', box) + self._level += 1 + + def open_polygon(self, polygon): + self._indent() + self.xmlgen.startElement('polygon', polygon) + self._level += 1 + + def open_polyline(self, polyline): + self._indent() + self.xmlgen.startElement('polyline', polyline) + self._level += 1 + + def open_points(self, points): + self._indent() + self.xmlgen.startElement('points', points) + self._level += 1 + + def add_attribute(self, attribute): + self._indent() + self.xmlgen.startElement('attribute', {'name': attribute['name']}) + self.xmlgen.characters(attribute['value']) + self.xmlgen.endElement('attribute') + + def _close_element(self, element): + self._level -= 1 + self._indent() + self.xmlgen.endElement(element) + + def close_box(self): + self._close_element('box') + + def close_polygon(self): + self._close_element('polygon') + + def close_polyline(self): + self._close_element('polyline') + + def close_points(self): + self._close_element('points') + + def close_image(self): + self._close_element('image') + + def close_track(self): + self._close_element('track') + + def close_root(self): + self._close_element('annotations') + self.xmlgen.endDocument() + +class _SubsetWriter: + def __init__(self, file, name, extractor, context): + self._writer = XmlAnnotationWriter(file) + self._name = name + self._extractor = extractor + self._context = context + + def write(self): + self._writer.open_root() + self._write_meta() + + for item in self._extractor: + if self._context._save_images: + self._save_image(item) + self._write_item(item) + + self._writer.close_root() + + def _save_image(self, item): + image = item.image + if image is None: + return + + image_path = osp.join(self._context._images_dir, + str(item.id) + CvatPath.IMAGE_EXT) + save_image(image_path, image) + + def _write_item(self, item): + h, w = 0, 0 + if item.has_image: + h, w = item.image.shape[:2] + self._writer.open_image(OrderedDict([ + ("id", str(item.id)), + ("name", str(item.id)), + ("width", str(w)), + ("height", str(h)) + ])) + + for ann in item.annotations: + if ann.type in {AnnotationType.points, AnnotationType.polyline, + AnnotationType.polygon, AnnotationType.bbox}: + self._write_shape(ann) + else: + continue + + self._writer.close_image() + + def _write_meta(self): + label_cat = self._extractor.categories()[AnnotationType.label] + meta = OrderedDict([ + ("task", OrderedDict([ + ("id", ""), + ("name", self._name), + ("size", str(len(self._extractor))), + ("mode", "annotation"), + ("overlap", ""), + ("start_frame", "0"), + ("stop_frame", str(len(self._extractor))), + ("frame_filter", ""), + ("z_order", "True"), + + ("labels", [ + ("label", OrderedDict([ + ("name", label.name), + ("attributes", [ + ("attribute", OrderedDict([ + ("name", attr), + ("mutable", "True"), + ("input_type", "text"), + ("default_value", ""), + ("values", ""), + ])) for attr in label.attributes + ]) + ])) for label in label_cat.items + ]), + ])), + ]) + self._writer.write_meta(meta) + + def _get_label(self, label_id): + label_cat = self._extractor.categories()[AnnotationType.label] + return label_cat.items[label_id] + + def _write_shape(self, shape): + if shape.label is None: + return + + shape_data = OrderedDict([ + ("label", self._get_label(shape.label).name), + ("occluded", str(int(shape.attributes.get('occluded', False)))), + ]) + + points = shape.get_points() + if shape.type == AnnotationType.bbox: + shape_data.update(OrderedDict([ + ("xtl", "{:.2f}".format(points[0])), + ("ytl", "{:.2f}".format(points[1])), + ("xbr", "{:.2f}".format(points[2])), + ("ybr", "{:.2f}".format(points[3])) + ])) + else: + shape_data.update(OrderedDict([ + ("points", ';'.join(( + ','.join(( + "{:.2f}".format(x), + "{:.2f}".format(y) + )) for x, y in pairwise(points)) + )), + ])) + + shape_data['z_order'] = str(int(shape.attributes.get('z_order', 0))) + if shape.group is not None: + shape_data['group_id'] = str(shape.group) + + if shape.type == AnnotationType.bbox: + self._writer.open_box(shape_data) + elif shape.type == AnnotationType.polygon: + self._writer.open_polygon(shape_data) + elif shape.type == AnnotationType.polyline: + self._writer.open_polyline(shape_data) + elif shape.type == AnnotationType.points: + self._writer.open_points(shape_data) + else: + raise NotImplementedError("unknown shape type") + + for attr_name, attr_value in shape.attributes.items(): + if attr_name in self._get_label(shape.label).attributes: + self._writer.add_attribute(OrderedDict([ + ("name", str(attr_name)), + ("value", str(attr_value)), + ])) + + if shape.type == AnnotationType.bbox: + self._writer.close_box() + elif shape.type == AnnotationType.polygon: + self._writer.close_polygon() + elif shape.type == AnnotationType.polyline: + self._writer.close_polyline() + elif shape.type == AnnotationType.points: + self._writer.close_points() + else: + raise NotImplementedError("unknown shape type") + +class _Converter: + def __init__(self, extractor, save_dir, save_images=False): + self._extractor = extractor + self._save_dir = save_dir + self._save_images = save_images + + def convert(self): + os.makedirs(self._save_dir, exist_ok=True) + + images_dir = osp.join(self._save_dir, CvatPath.IMAGES_DIR) + os.makedirs(images_dir, exist_ok=True) + self._images_dir = images_dir + + annotations_dir = osp.join(self._save_dir, CvatPath.ANNOTATIONS_DIR) + os.makedirs(annotations_dir, exist_ok=True) + self._annotations_dir = annotations_dir + + subsets = self._extractor.subsets() + if len(subsets) == 0: + subsets = [ None ] + + for subset_name in subsets: + if subset_name: + subset = self._extractor.get_subset(subset_name) + else: + subset_name = DEFAULT_SUBSET_NAME + subset = self._extractor + + with open(osp.join(annotations_dir, '%s.xml' % subset_name), 'w') as f: + writer = _SubsetWriter(f, subset_name, subset, self) + writer.write() + +class CvatConverter(Converter): + def __init__(self, save_images=False, cmdline_args=None): + super().__init__() + + self._options = { + 'save_images': save_images, + } + + if cmdline_args is not None: + self._options.update(self._parse_cmdline(cmdline_args)) + + @classmethod + def build_cmdline_parser(cls, parser=None): + import argparse + if not parser: + parser = argparse.ArgumentParser() + + parser.add_argument('--save-images', action='store_true', + help="Save images (default: %(default)s)") + + return parser + + def __call__(self, extractor, save_dir): + converter = _Converter(extractor, save_dir, **self._options) + converter.convert() \ No newline at end of file diff --git a/datumaro/tests/test_cvat_format.py b/datumaro/tests/test_cvat_format.py index bc06b056..1cbdb743 100644 --- a/datumaro/tests/test_cvat_format.py +++ b/datumaro/tests/test_cvat_format.py @@ -10,6 +10,8 @@ from datumaro.components.extractor import (Extractor, DatasetItem, LabelCategories, ) from datumaro.components.importers.cvat import CvatImporter +from datumaro.components.converters.cvat import CvatConverter +from datumaro.components.project import Project import datumaro.components.formats.cvat as Cvat from datumaro.util.image import save_image from datumaro.util.test_utils import TestDir @@ -145,4 +147,122 @@ class CvatExtractorTest(TestCase): for item_a, item_b in zip(source_subset, parsed_subset): self.assertEqual(len(item_a.annotations), len(item_b.annotations)) for ann_a, ann_b in zip(item_a.annotations, item_b.annotations): - self.assertEqual(ann_a, ann_b) \ No newline at end of file + self.assertEqual(ann_a, ann_b) + + +class CvatConverterTest(TestCase): + def _test_save_and_load(self, source_dataset, converter, test_dir, + importer_params=None, target_dataset=None): + converter(source_dataset, test_dir.path) + + if not importer_params: + importer_params = {} + project = Project.import_from(test_dir.path, 'cvat', **importer_params) + parsed_dataset = project.make_dataset() + + if target_dataset is not None: + source_dataset = target_dataset + self.assertListEqual( + sorted(source_dataset.subsets()), + sorted(parsed_dataset.subsets()), + ) + + self.assertEqual(len(source_dataset), len(parsed_dataset)) + + for subset_name in source_dataset.subsets(): + source_subset = source_dataset.get_subset(subset_name) + parsed_subset = parsed_dataset.get_subset(subset_name) + self.assertEqual(len(source_subset), len(parsed_subset)) + for idx, (item_a, item_b) in enumerate( + zip(source_subset, parsed_subset)): + self.assertEqual(item_a, item_b, str(idx)) + + def test_can_save_and_load(self): + label_categories = LabelCategories() + for i in range(10): + label_categories.add(str(i)) + label_categories.items[2].attributes.update(['a1', 'a2']) + label_categories.attributes.update(['z_order', 'occluded']) + + class SrcTestExtractor(Extractor): + def __iter__(self): + return iter([ + DatasetItem(id=0, subset='s1', image=np.zeros((5, 10, 3)), + annotations=[ + PolygonObject([0, 0, 4, 0, 4, 4], + label=1, group=4, + attributes={ 'occluded': True }), + PolygonObject([5, 0, 9, 0, 5, 5], + label=2, group=4, + attributes={ 'unknown': 'bar' }), + PointsObject([1, 1, 3, 2, 2, 3], + label=2, + attributes={ 'a1': 'x', 'a2': 42 }), + ] + ), + DatasetItem(id=1, subset='s1', + annotations=[ + PolyLineObject([0, 0, 4, 0, 4, 4], + label=3, id=4, group=4), + BboxObject(5, 0, 1, 9, + label=3, id=4, group=4), + ] + ), + + DatasetItem(id=0, subset='s2', image=np.zeros((5, 10, 3)), + annotations=[ + PolygonObject([0, 0, 4, 0, 4, 4], + label=3, group=4, + attributes={ 'z_order': 1, 'occluded': False }), + PolyLineObject([5, 0, 9, 0, 5, 5]), # will be skipped + ] + ), + ]) + + def categories(self): + return { AnnotationType.label: label_categories } + + class DstTestExtractor(Extractor): + def __iter__(self): + return iter([ + DatasetItem(id=0, subset='s1', image=np.zeros((5, 10, 3)), + annotations=[ + PolygonObject([0, 0, 4, 0, 4, 4], + label=1, group=4, + attributes={ 'z_order': 0, 'occluded': True }), + PolygonObject([5, 0, 9, 0, 5, 5], + label=2, group=4, + attributes={ 'z_order': 0, 'occluded': False }), + PointsObject([1, 1, 3, 2, 2, 3], + label=2, + attributes={ 'z_order': 0, 'occluded': False, + 'a1': 'x', 'a2': '42' }), + ] + ), + DatasetItem(id=1, subset='s1', + annotations=[ + PolyLineObject([0, 0, 4, 0, 4, 4], + label=3, group=4, + attributes={ 'z_order': 0, 'occluded': False }), + BboxObject(5, 0, 1, 9, + label=3, group=4, + attributes={ 'z_order': 0, 'occluded': False }), + ] + ), + + DatasetItem(id=0, subset='s2', image=np.zeros((5, 10, 3)), + annotations=[ + PolygonObject([0, 0, 4, 0, 4, 4], + label=3, group=4, + attributes={ 'z_order': 1, 'occluded': False }), + ] + ), + ]) + + def categories(self): + return { AnnotationType.label: label_categories } + + with TestDir() as test_dir: + self._test_save_and_load(SrcTestExtractor(), + CvatConverter(save_images=True), test_dir, + target_dataset=DstTestExtractor())