[Datumaro] Instance polygon-mask conversions in COCO format (#1008)

* Microoptimizations

* Mask conversion functions

* Add mask-polygon conversions

* Add mask-polygon conversions in coco

* Add mask-polygon conversions in coco

* Update requirements

* Option to disable crop

* Fix cli parameter passing

* Fix test

* Fixes in COCO
main
zhiltsov-max 6 years ago committed by Nikita Manovich
parent 8da20b38d5
commit e0bcc4652b

@ -3,8 +3,10 @@
#
# SPDX-License-Identifier: MIT
from enum import Enum
from itertools import groupby
import json
import numpy as np
import logging as log
import os
import os.path as osp
@ -12,7 +14,7 @@ import pycocotools.mask as mask_utils
from datumaro.components.converter import Converter
from datumaro.components.extractor import (
DEFAULT_SUBSET_NAME, AnnotationType, PointsObject, BboxObject
DEFAULT_SUBSET_NAME, AnnotationType, PointsObject, BboxObject, MaskObject
)
from datumaro.components.formats.ms_coco import CocoTask, CocoPath
from datumaro.util import find
@ -28,6 +30,9 @@ def _cast(value, type_conv, default=None):
except Exception:
return default
SegmentationMode = Enum('SegmentationMode', ['guess', 'polygons', 'mask'])
class _TaskConverter:
def __init__(self, context):
self._min_ann_id = 1
@ -108,136 +113,6 @@ class _TaskConverter:
self._min_ann_id = max(ann_id, self._min_ann_id)
return ann_id
class _InstancesConverter(_TaskConverter):
def save_categories(self, dataset):
label_categories = dataset.categories().get(AnnotationType.label)
if label_categories is None:
return
for idx, cat in enumerate(label_categories.items):
self.categories.append({
'id': 1 + idx,
'name': _cast(cat.name, str, ''),
'supercategory': _cast(cat.parent, str, ''),
})
def save_annotations(self, item):
annotations = item.annotations.copy()
while len(annotations) != 0:
ann = annotations.pop()
if ann.type == AnnotationType.bbox and ann.label is not None:
pass
elif ann.type == AnnotationType.polygon and ann.label is not None:
pass
elif ann.type == AnnotationType.mask and ann.label is not None:
pass
else:
continue
bbox = None
segmentation = None
if ann.type == AnnotationType.bbox:
is_crowd = ann.attributes.get('is_crowd', False)
bbox = ann.get_bbox()
elif ann.type == AnnotationType.polygon:
is_crowd = ann.attributes.get('is_crowd', False)
elif ann.type == AnnotationType.mask:
is_crowd = ann.attributes.get('is_crowd', True)
if is_crowd:
segmentation = ann
area = None
# If ann in a group, try to find corresponding annotations in
# this group, otherwise try to infer them.
if bbox is None and ann.group is not None:
bbox = find(annotations, lambda x: \
x.group == ann.group and \
x.type == AnnotationType.bbox and \
x.label == ann.label)
if bbox is not None:
bbox = bbox.get_bbox()
if is_crowd:
# is_crowd=True means there should be a mask
if segmentation is None and ann.group is not None:
segmentation = find(annotations, lambda x: \
x.group == ann.group and \
x.type == AnnotationType.mask and \
x.label == ann.label)
if segmentation is not None:
binary_mask = np.array(segmentation.image, dtype=np.bool)
binary_mask = np.asfortranarray(binary_mask, dtype=np.uint8)
segmentation = mask_utils.encode(binary_mask)
area = mask_utils.area(segmentation)
segmentation = mask_tools.convert_mask_to_rle(binary_mask)
else:
# is_crowd=False means there are some polygons
polygons = []
if ann.type == AnnotationType.polygon:
polygons = [ ann ]
if ann.group is not None:
# A single object can consist of several polygons
polygons += [p for p in annotations
if p.group == ann.group and \
p.type == AnnotationType.polygon and \
p.label == ann.label]
if polygons:
segmentation = [p.get_points() for p in polygons]
h, w = item.image.shape[:2]
rles = mask_utils.frPyObjects(segmentation, h, w)
rle = mask_utils.merge(rles)
area = mask_utils.area(rle)
if self._context._merge_polygons:
binary_mask = mask_utils.decode(rle).astype(np.bool)
binary_mask = np.asfortranarray(binary_mask, dtype=np.uint8)
segmentation = mask_tools.convert_mask_to_rle(binary_mask)
is_crowd = True
bbox = [int(i) for i in mask_utils.toBbox(rle)]
if ann.group is not None:
# Mark the group as visited to prevent repeats
for a in annotations[:]:
if a.group == ann.group:
annotations.remove(a)
if segmentation is None:
is_crowd = False
segmentation = [ann.get_polygon()]
area = ann.area()
if self._context._merge_polygons:
h, w = item.image.shape[:2]
rles = mask_utils.frPyObjects(segmentation, h, w)
rle = mask_utils.merge(rles)
area = mask_utils.area(rle)
binary_mask = mask_utils.decode(rle).astype(np.bool)
binary_mask = np.asfortranarray(binary_mask, dtype=np.uint8)
segmentation = mask_tools.convert_mask_to_rle(binary_mask)
is_crowd = True
bbox = [int(i) for i in mask_utils.toBbox(rle)]
if bbox is None:
bbox = ann.get_bbox()
elem = {
'id': self._get_ann_id(ann),
'image_id': _cast(item.id, int, 0),
'category_id': _cast(ann.label, int, -1) + 1,
'segmentation': segmentation,
'area': float(area),
'bbox': bbox,
'iscrowd': int(is_crowd),
}
if 'score' in ann.attributes:
elem['score'] = float(ann.attributes['score'])
self.annotations.append(elem)
class _ImageInfoConverter(_TaskConverter):
def is_empty(self):
return len(self._data['images']) == 0
@ -268,7 +143,210 @@ class _CaptionsConverter(_TaskConverter):
self.annotations.append(elem)
class _KeypointsConverter(_TaskConverter):
class _InstancesConverter(_TaskConverter):
def save_categories(self, dataset):
label_categories = dataset.categories().get(AnnotationType.label)
if label_categories is None:
return
for idx, cat in enumerate(label_categories.items):
self.categories.append({
'id': 1 + idx,
'name': _cast(cat.name, str, ''),
'supercategory': _cast(cat.parent, str, ''),
})
@classmethod
def crop_segments(cls, instances, img_width, img_height):
instances = sorted(instances, key=lambda x: x[0].z_order)
segment_map = []
segments = []
for inst_idx, (_, polygons, mask, _) in enumerate(instances):
if polygons:
segment_map.extend(inst_idx for p in polygons)
segments.extend(polygons)
elif mask is not None:
segment_map.append(inst_idx)
segments.append(mask)
segments = mask_tools.crop_covered_segments(
segments, img_width, img_height)
for inst_idx, inst in enumerate(instances):
new_segments = [s for si_id, s in zip(segment_map, segments)
if si_id == inst_idx]
if not new_segments:
inst[1] = []
inst[2] = None
continue
if inst[1]:
inst[1] = sum(new_segments, [])
else:
mask = cls.merge_masks(new_segments)
inst[2] = mask_tools.mask_to_rle(mask)
return instances
def find_instance_parts(self, group, img_width, img_height):
boxes = [a for a in group if a.type == AnnotationType.bbox]
polygons = [a for a in group if a.type == AnnotationType.polygon]
masks = [a for a in group if a.type == AnnotationType.mask]
anns = boxes + polygons + masks
leader = self.find_group_leader(anns)
bbox = self.compute_bbox(anns)
mask = None
polygons = [p.get_polygon() for p in polygons]
if self._context._segmentation_mode == SegmentationMode.guess:
use_masks = leader.attributes.get('is_crowd',
find(masks, lambda x: x.label == leader.label) is not None)
elif self._context._segmentation_mode == SegmentationMode.polygons:
use_masks = False
elif self._context._segmentation_mode == SegmentationMode.mask:
use_masks = True
else:
raise NotImplementedError("Unexpected segmentation mode '%s'" % \
self._context._segmentation_mode)
if use_masks:
if polygons:
mask = mask_tools.rles_to_mask(polygons, img_width, img_height)
if masks:
if mask is not None:
masks += [mask]
mask = self.merge_masks(masks)
if mask is not None:
mask = mask_tools.mask_to_rle(mask)
polygons = []
else:
if masks:
mask = self.merge_masks(masks)
polygons += mask_tools.mask_to_polygons(mask)
mask = None
return [leader, polygons, mask, bbox]
@staticmethod
def find_group_leader(group):
return max(group, key=lambda x: x.area())
@staticmethod
def merge_masks(masks):
if not masks:
return None
def get_mask(m):
if isinstance(m, MaskObject):
return m.image
else:
return m
binary_mask = get_mask(masks[0])
for m in masks[1:]:
binary_mask |= get_mask(m)
return binary_mask
@staticmethod
def compute_bbox(annotations):
boxes = [ann.get_bbox() for ann in annotations]
x0 = min((b[0] for b in boxes), default=0)
y0 = min((b[1] for b in boxes), default=0)
x1 = max((b[0] + b[2] for b in boxes), default=0)
y1 = max((b[1] + b[3] for b in boxes), default=0)
return [x0, y0, x1 - x0, y1 - y0]
@staticmethod
def find_instance_anns(annotations):
return [a for a in annotations
if a.type in { AnnotationType.bbox, AnnotationType.polygon } or \
a.type == AnnotationType.mask and a.label is not None
]
@classmethod
def find_instances(cls, annotations):
instance_anns = cls.find_instance_anns(annotations)
ann_groups = []
for g_id, group in groupby(instance_anns, lambda a: a.group):
if g_id is None:
ann_groups.extend(([a] for a in group))
else:
ann_groups.append(list(group))
return ann_groups
def save_annotations(self, item):
instances = self.find_instances(item.annotations)
if not instances:
return
if not item.has_image:
log.warn("Skipping writing instances for "
"item '%s' as it has no image info" % item.id)
return
h, w, _ = item.image.shape
instances = [self.find_instance_parts(i, w, h) for i in instances]
if self._context._crop_covered:
instances = self.crop_segments(instances, w, h)
for instance in instances:
elem = self.convert_instance(instance, item)
if elem:
self.annotations.append(elem)
def convert_instance(self, instance, item):
ann, polygons, mask, bbox = instance
is_crowd = mask is not None
if is_crowd:
segmentation = mask
else:
segmentation = [list(map(float, p)) for p in polygons]
area = 0
if segmentation:
if item.has_image:
h, w, _ = item.image.shape
else:
# NOTE: here we can guess the image size as
# it is only needed for the area computation
w = bbox[0] + bbox[2]
h = bbox[1] + bbox[3]
rles = mask_utils.frPyObjects(segmentation, h, w)
if is_crowd:
rles = [rles]
else:
rles = mask_utils.merge(rles)
area = mask_utils.area(rles)
else:
x, y, w, h = bbox
segmentation = [[x, y, x + w, y, x + w, y + h, x, y + h]]
area = w * h
elem = {
'id': self._get_ann_id(ann),
'image_id': _cast(item.id, int, 0),
'category_id': _cast(ann.label, int, -1) + 1,
'segmentation': segmentation,
'area': float(area),
'bbox': list(map(float, bbox)),
'iscrowd': int(is_crowd),
}
if 'score' in ann.attributes:
elem['score'] = float(ann.attributes['score'])
return elem
class _KeypointsConverter(_InstancesConverter):
def save_categories(self, dataset):
label_categories = dataset.categories().get(AnnotationType.label)
if label_categories is None:
@ -290,45 +368,61 @@ class _KeypointsConverter(_TaskConverter):
self.categories.append(cat)
def save_annotations(self, item):
for ann in item.annotations:
if ann.type != AnnotationType.points:
continue
point_annotations = [a for a in item.annotations
if a.type == AnnotationType.points]
if not point_annotations:
return
elem = {
'id': self._get_ann_id(ann),
'image_id': _cast(item.id, int, 0),
'category_id': _cast(ann.label, int, -1) + 1,
}
if 'score' in ann.attributes:
elem['score'] = float(ann.attributes['score'])
# Create annotations for solitary keypoints annotations
for points in self.find_solitary_points(item.annotations):
instance = [points, [], None, points.get_bbox()]
elem = super().convert_instance(instance, item)
elem.update(self.convert_points_object(points))
if elem:
self.annotations.append(elem)
keypoints = []
points = ann.get_points()
visibility = ann.visibility
for index in range(0, len(points), 2):
kp = points[index : index + 2]
state = visibility[index // 2].value
keypoints.extend([*kp, state])
num_visible = len([v for v in visibility \
if v == PointsObject.Visibility.visible])
bbox = find(item.annotations, lambda x: \
x.group == ann.group and \
x.type == AnnotationType.bbox and
x.label == ann.label)
if bbox is None:
bbox = BboxObject(*ann.get_bbox())
elem.update({
'segmentation': bbox.get_polygon(),
'area': bbox.area(),
'bbox': bbox.get_bbox(),
'iscrowd': 0,
'keypoints': keypoints,
'num_keypoints': num_visible,
})
# Create annotations for complete instance + keypoints annotations
super().save_annotations(item)
self.annotations.append(elem)
@classmethod
def find_solitary_points(cls, annotations):
solitary_points = []
for g_id, group in groupby(annotations, lambda a: a.group):
if g_id is not None and not cls.find_instance_anns(group):
group = [a for a in group if a.type == AnnotationType.points]
solitary_points.extend(group)
return solitary_points
@staticmethod
def convert_points_object(ann):
keypoints = []
points = ann.get_points()
visibility = ann.visibility
for index in range(0, len(points), 2):
kp = points[index : index + 2]
state = visibility[index // 2].value
keypoints.extend([*kp, state])
num_annotated = len([v for v in visibility \
if v != PointsObject.Visibility.absent])
return {
'keypoints': keypoints,
'num_keypoints': num_annotated,
}
def convert_instance(self, instance, item):
points_ann = find(item.annotations, lambda x: \
x.type == AnnotationType.points and x.group == instance[0].group)
if not points_ann:
return None
elem = super().convert_instance(instance, item)
elem.update(self.convert_points_object(points_ann))
return elem
class _LabelsConverter(_TaskConverter):
def save_categories(self, dataset):
@ -368,7 +462,8 @@ class _Converter:
}
def __init__(self, extractor, save_dir,
tasks=None, save_images=False, merge_polygons=False):
tasks=None, save_images=False, segmentation_mode=None,
crop_covered=False):
assert tasks is None or isinstance(tasks, (CocoTask, list))
if tasks is None:
tasks = list(self._TASK_CONVERTER)
@ -383,7 +478,17 @@ class _Converter:
self._save_dir = save_dir
self._save_images = save_images
self._merge_polygons = merge_polygons
assert segmentation_mode is None or \
segmentation_mode in SegmentationMode or \
isinstance(segmentation_mode, str)
if segmentation_mode is None:
segmentation_mode = SegmentationMode.guess
if isinstance(segmentation_mode, str):
segmentation_mode = SegmentationMode[segmentation_mode]
self._segmentation_mode = segmentation_mode
self._crop_covered = crop_covered
def make_dirs(self):
self._images_dir = osp.join(self._save_dir, CocoPath.IMAGES_DIR)
@ -442,14 +547,16 @@ class _Converter:
class CocoConverter(Converter):
def __init__(self,
tasks=None, save_images=False, merge_polygons=False,
tasks=None, save_images=False, segmentation_mode=None,
crop_covered=False,
cmdline_args=None):
super().__init__()
self._options = {
'tasks': tasks,
'save_images': save_images,
'merge_polygons': merge_polygons,
'segmentation_mode': segmentation_mode,
'crop_covered': crop_covered,
}
if cmdline_args is not None:
@ -467,8 +574,20 @@ class CocoConverter(Converter):
parser.add_argument('--save-images', action='store_true',
help="Save images (default: %(default)s)")
parser.add_argument('--merge-polygons', action='store_true',
help="Merge instance polygons into a mask (default: %(default)s)")
parser.add_argument('--segmentation-mode',
choices=[m.name for m in SegmentationMode],
default=SegmentationMode.guess.name,
help="Save mode for instance segmentation: "
"- '{sm.guess.name}': guess the mode for each instance, "
"use 'is_crowd' attribute as hint; "
"- '{sm.polygons.name}': save polygons, "
"merge and convert masks, prefer polygons; "
"- '{sm.mask.name}': save masks, "
"merge and convert polygons, prefer masks; "
"(default: %(default)s)".format(sm=SegmentationMode))
parser.add_argument('--crop-covered', action='store_true',
help="Crop covered segments so that background objects' "
"segmentation was more accurate (default: %(default)s)")
parser.add_argument('--tasks', type=cls._split_tasks_string,
default=None,
help="COCO task filter, comma-separated list of {%s} "

@ -159,12 +159,16 @@ class MaskCategories(Categories):
class MaskObject(Annotation):
# pylint: disable=redefined-builtin
def __init__(self, image=None, label=None,
def __init__(self, image=None, label=None, z_order=None,
id=None, attributes=None, group=None):
super().__init__(id=id, type=AnnotationType.mask,
attributes=attributes, group=group)
self._image = image
self._label = label
if z_order is None:
z_order = 0
self._z_order = z_order
# pylint: enable=redefined-builtin
@property
@ -181,22 +185,69 @@ class MaskObject(Annotation):
raise NotImplementedError()
def area(self):
raise NotImplementedError()
if self._label is None:
raise NotImplementedError()
return np.count_nonzero(self.image)
def extract(self, class_id):
raise NotImplementedError()
def bbox(self):
raise NotImplementedError()
def get_bbox(self):
if self._label is None:
raise NotImplementedError()
image = self.image
cols = np.any(image, axis=0)
rows = np.any(image, axis=1)
x0, x1 = np.where(cols)[0][[0, -1]]
y0, y1 = np.where(rows)[0][[0, -1]]
return [x0, y0, x1 - x0, y1 - y0]
@property
def z_order(self):
return self._z_order
def __eq__(self, other):
if not super().__eq__(other):
return False
return \
(self.label == other.label) and \
(self.z_order == other.z_order) and \
(self.image is not None and other.image is not None and \
np.all(self.image == other.image))
class RleMask(MaskObject):
# pylint: disable=redefined-builtin
def __init__(self, rle=None, label=None, z_order=None,
id=None, attributes=None, group=None):
lazy_decode = self._lazy_decode(rle)
super().__init__(image=lazy_decode, label=label, z_order=z_order,
id=id, attributes=attributes, group=group)
self._rle = rle
# pylint: enable=redefined-builtin
@staticmethod
def _lazy_decode(rle):
from pycocotools import mask as mask_utils
return lambda: mask_utils.decode(rle).astype(np.bool)
def area(self):
from pycocotools import mask as mask_utils
return mask_utils.area(self._rle)
def bbox(self):
from pycocotools import mask as mask_utils
return mask_utils.toBbox(self._rle)
@property
def rle(self):
return self._rle
def __eq__(self, other):
if not isinstance(other, __class__):
return super().__eq__(other)
return self._rle == other._rle
def compute_iou(bbox_a, bbox_b):
aX, aY, aW, aH = bbox_a
bX, bY, bW, bH = bbox_b
@ -217,12 +268,16 @@ def compute_iou(bbox_a, bbox_b):
class ShapeObject(Annotation):
# pylint: disable=redefined-builtin
def __init__(self, type, points=None, label=None,
def __init__(self, type, points=None, label=None, z_order=None,
id=None, attributes=None, group=None):
super().__init__(id=id, type=type,
attributes=attributes, group=group)
self.points = points
self.label = label
if z_order is None:
z_order = 0
self._z_order = z_order
# pylint: enable=redefined-builtin
def area(self):
@ -247,22 +302,24 @@ class ShapeObject(Annotation):
def get_points(self):
return self.points
def get_mask(self):
raise NotImplementedError()
@property
def z_order(self):
return self._z_order
def __eq__(self, other):
if not super().__eq__(other):
return False
return \
(self.points == other.points) and \
(self.z_order == other.z_order) and \
(self.label == other.label)
class PolyLineObject(ShapeObject):
# pylint: disable=redefined-builtin
def __init__(self, points=None,
label=None, id=None, attributes=None, group=None):
def __init__(self, points=None, label=None, z_order=None,
id=None, attributes=None, group=None):
super().__init__(type=AnnotationType.polyline,
points=points, label=label,
points=points, label=label, z_order=z_order,
id=id, attributes=attributes, group=group)
# pylint: enable=redefined-builtin
@ -274,12 +331,12 @@ class PolyLineObject(ShapeObject):
class PolygonObject(ShapeObject):
# pylint: disable=redefined-builtin
def __init__(self, points=None,
def __init__(self, points=None, z_order=None,
label=None, id=None, attributes=None, group=None):
if points is not None:
assert len(points) % 2 == 0 and 3 <= len(points) // 2, "Wrong polygon points: %s" % points
super().__init__(type=AnnotationType.polygon,
points=points, label=label,
points=points, label=label, z_order=z_order,
id=id, attributes=attributes, group=group)
# pylint: enable=redefined-builtin
@ -291,15 +348,15 @@ class PolygonObject(ShapeObject):
_, _, w, h = self.get_bbox()
rle = mask_utils.frPyObjects([self.get_points()], h, w)
area = mask_utils.area(rle)
area = mask_utils.area(rle)[0]
return area
class BboxObject(ShapeObject):
# pylint: disable=redefined-builtin
def __init__(self, x=0, y=0, w=0, h=0,
label=None, id=None, attributes=None, group=None):
def __init__(self, x=0, y=0, w=0, h=0, label=None, z_order=None,
id=None, attributes=None, group=None):
super().__init__(type=AnnotationType.bbox,
points=[x, y, x + w, y + h], label=label,
points=[x, y, x + w, y + h], label=label, z_order=z_order,
id=id, attributes=attributes, group=group)
# pylint: enable=redefined-builtin
@ -368,7 +425,7 @@ class PointsObject(ShapeObject):
])
# pylint: disable=redefined-builtin
def __init__(self, points=None, visibility=None, label=None,
def __init__(self, points=None, visibility=None, label=None, z_order=None,
id=None, attributes=None, group=None):
if points is not None:
assert len(points) % 2 == 0
@ -381,10 +438,10 @@ class PointsObject(ShapeObject):
else:
visibility = []
for _ in range(len(points) // 2):
visibility.append(self.Visibility.absent)
visibility.append(self.Visibility.visible)
super().__init__(type=AnnotationType.points,
points=points, label=label,
points=points, label=label, z_order=z_order,
id=id, attributes=attributes, group=group)
self.visibility = visibility
@ -393,6 +450,17 @@ class PointsObject(ShapeObject):
def area(self):
return 0
def get_bbox(self):
xs = [p for p, v in zip(self.points[0::2], self.visibility)
if v != __class__.Visibility.absent]
ys = [p for p, v in zip(self.points[1::2], self.visibility)
if v != __class__.Visibility.absent]
x0 = min(xs, default=0)
x1 = max(xs, default=0)
y0 = min(ys, default=0)
y1 = max(ys, default=0)
return [x0, y0, x1 - x0, y1 - y0]
def __eq__(self, other):
if not super().__eq__(other):
return False

@ -4,8 +4,6 @@
# SPDX-License-Identifier: MIT
from collections import OrderedDict
from itertools import chain
import numpy as np
import os.path as osp
from pycocotools.coco import COCO
@ -13,7 +11,7 @@ import pycocotools.mask as mask_utils
from datumaro.components.extractor import (Extractor, DatasetItem,
DEFAULT_SUBSET_NAME, AnnotationType,
LabelObject, MaskObject, PointsObject, PolygonObject,
LabelObject, RleMask, PointsObject, PolygonObject,
BboxObject, CaptionObject,
LabelCategories, PointsCategories
)
@ -21,28 +19,6 @@ from datumaro.components.formats.ms_coco import CocoTask, CocoPath
from datumaro.util.image import lazy_image
class RleMask(MaskObject):
# pylint: disable=redefined-builtin
def __init__(self, rle=None, label=None,
id=None, attributes=None, group=None):
lazy_decode = lambda: mask_utils.decode(rle).astype(np.bool)
super().__init__(image=lazy_decode, label=label,
id=id, attributes=attributes, group=group)
self._rle = rle
# pylint: enable=redefined-builtin
def area(self):
return mask_utils.area(self._rle)
def bbox(self):
return mask_utils.toBbox(self._rle)
def __eq__(self, other):
if not isinstance(other, __class__):
return super().__eq__(other)
return self._rle == other._rle
class CocoExtractor(Extractor):
def __init__(self, path, task, merge_instance_polygons=False):
super().__init__()
@ -144,8 +120,7 @@ class CocoExtractor(Extractor):
anns = loader.getAnnIds(imgIds=img_id)
anns = loader.loadAnns(anns)
anns = list(chain(*(
self._load_annotations(ann, image_info) for ann in anns)))
anns = sum((self._load_annotations(a, image_info) for a in anns), [])
items[img_id] = DatasetItem(id=img_id, subset=self._subset,
image=image, annotations=anns)
@ -167,17 +142,26 @@ class CocoExtractor(Extractor):
if 'score' in ann:
attributes['score'] = ann['score']
if self._task is CocoTask.instances:
group = ann_id # make sure all tasks' annotations are merged
if self._task in [CocoTask.instances, CocoTask.person_keypoints]:
x, y, w, h = ann['bbox']
label_id = self._get_label_id(ann)
group = None
is_crowd = bool(ann['iscrowd'])
attributes['is_crowd'] = is_crowd
if self._task is CocoTask.person_keypoints:
keypoints = ann['keypoints']
points = [p for i, p in enumerate(keypoints) if i % 3 != 2]
visibility = keypoints[2::3]
parsed_annotations.append(
PointsObject(points, visibility, label=label_id,
id=ann_id, attributes=attributes, group=group)
)
segmentation = ann.get('segmentation')
if segmentation is not None:
group = ann_id
rle = None
if isinstance(segmentation, list):
@ -185,7 +169,7 @@ class CocoExtractor(Extractor):
for polygon_points in segmentation:
parsed_annotations.append(PolygonObject(
points=polygon_points, label=label_id,
id=ann_id, group=group, attributes=attributes
id=ann_id, attributes=attributes, group=group
))
if self._merge_instance_polygons:
@ -204,7 +188,7 @@ class CocoExtractor(Extractor):
if rle is not None:
parsed_annotations.append(RleMask(rle=rle, label=label_id,
id=ann_id, group=group, attributes=attributes
id=ann_id, attributes=attributes, group=group
))
parsed_annotations.append(
@ -214,30 +198,14 @@ class CocoExtractor(Extractor):
elif self._task is CocoTask.labels:
label_id = self._get_label_id(ann)
parsed_annotations.append(
LabelObject(label=label_id, id=ann_id, attributes=attributes)
)
elif self._task is CocoTask.person_keypoints:
keypoints = ann['keypoints']
points = [p for i, p in enumerate(keypoints) if i % 3 != 2]
visibility = keypoints[2::3]
bbox = ann.get('bbox')
label_id = self._get_label_id(ann)
group = None
if bbox is not None:
group = ann_id
parsed_annotations.append(
PointsObject(points, visibility, label=label_id,
LabelObject(label=label_id,
id=ann_id, attributes=attributes, group=group)
)
if bbox is not None:
parsed_annotations.append(
BboxObject(*bbox, label=label_id, group=group)
)
elif self._task is CocoTask.captions:
caption = ann['caption']
parsed_annotations.append(
CaptionObject(caption,
id=ann_id, attributes=attributes)
id=ann_id, attributes=attributes, group=group)
)
else:
raise NotImplementedError()

@ -34,14 +34,14 @@ def load_image(path):
from PIL import Image
image = Image.open(path)
image = np.asarray(image, dtype=np.float32)
if len(image.shape) == 3 and image.shape[2] in [3, 4]:
if len(image.shape) == 3 and image.shape[2] in {3, 4}:
image[:, :, :3] = image[:, :, 2::-1] # RGB to BGR
else:
raise NotImplementedError()
assert len(image.shape) in [2, 3]
assert len(image.shape) in {2, 3}
if len(image.shape) == 3:
assert image.shape[2] in [3, 4]
assert image.shape[2] in {3, 4}
return image
def save_image(path, image, params=None):
@ -60,7 +60,7 @@ def save_image(path, image, params=None):
params = {}
image = image.astype(np.uint8)
if len(image.shape) == 3 and image.shape[2] in [3, 4]:
if len(image.shape) == 3 and image.shape[2] in {3, 4}:
image[:, :, :3] = image[:, :, 2::-1] # BGR to RGB
image = Image.fromarray(image)
image.save(path, **params)
@ -92,7 +92,7 @@ def encode_image(image, ext, params=None):
params = {}
image = image.astype(np.uint8)
if len(image.shape) == 3 and image.shape[2] in [3, 4]:
if len(image.shape) == 3 and image.shape[2] in {3, 4}:
image[:, :, :3] = image[:, :, 2::-1] # BGR to RGB
image = Image.fromarray(image)
with BytesIO() as buffer:
@ -111,14 +111,14 @@ def decode_image(image_bytes):
from PIL import Image
image = Image.open(BytesIO(image_bytes))
image = np.asarray(image, dtype=np.float32)
if len(image.shape) == 3 and image.shape[2] in [3, 4]:
if len(image.shape) == 3 and image.shape[2] in {3, 4}:
image[:, :, :3] = image[:, :, 2::-1] # RGB to BGR
else:
raise NotImplementedError()
assert len(image.shape) in [2, 3]
assert len(image.shape) in {2, 3}
if len(image.shape) == 3:
assert image.shape[2] in [3, 4]
assert image.shape[2] in {3, 4}
return image
@ -131,7 +131,7 @@ class lazy_image:
# - False: do not cache
# - None: use default (don't store in a class variable)
# - object: use this object as a cache
assert cache in [None, False] or isinstance(cache, object)
assert cache in {None, False} or isinstance(cache, object)
self.cache = cache
def __call__(self):

@ -91,7 +91,7 @@ def lazy_mask(path, colormap=None):
return lazy_image(path, lambda path: load_mask(path, colormap))
def convert_mask_to_rle(binary_mask):
def mask_to_rle(binary_mask):
counts = []
for i, (value, elements) in enumerate(
groupby(binary_mask.ravel(order='F'))):
@ -103,4 +103,136 @@ def convert_mask_to_rle(binary_mask):
return {
'counts': counts,
'size': list(binary_mask.shape)
}
}
def mask_to_polygons(mask, tolerance=1.0, area_threshold=1):
"""
Convert an instance mask to polygons
Args:
mask: a 2d binary mask
tolerance: maximum distance from original points of
a polygon to the approximated ones
area_threshold: minimal area of generated polygons
Returns:
A list of polygons like [[x1,y1, x2,y2 ...], [...]]
"""
from pycocotools import mask as mask_utils
from skimage import measure
polygons = []
# pad mask with 0 around borders
padded_mask = np.pad(mask, pad_width=1, mode='constant', constant_values=0)
contours = measure.find_contours(padded_mask, 0.5)
# Fix coordinates after padding
contours = np.subtract(contours, 1)
for contour in contours:
if not np.array_equal(contour[0], contour[-1]):
contour = np.vstack((contour, contour[0])) # make polygon closed
contour = measure.approximate_polygon(contour, tolerance)
if len(contour) <= 2:
continue
contour = np.flip(contour, axis=1).flatten().clip(0) # [x0, y0, ...]
# Check if the polygon is big enough
rle = mask_utils.frPyObjects([contour], mask.shape[0], mask.shape[1])
area = sum(mask_utils.area(rle))
if area_threshold <= area:
polygons.append(contour)
return polygons
def crop_covered_segments(segments, width, height,
iou_threshold=0.0, ratio_tolerance=0.001, area_threshold=1,
return_masks=False):
"""
Find all segments occluded by others and crop them to the visible part only.
Input segments are expected to be sorted from background to foreground.
Args:
segments: 1d list of segment RLEs (in COCO format)
width: width of the image
height: height of the image
iou_threshold: IoU threshold for objects to be counted as intersected
By default is set to 0 to process any intersected objects
ratio_tolerance: an IoU "handicap" value for a situation
when an object is (almost) fully covered by another one and we
don't want make a "hole" in the background object
area_threshold: minimal area of included segments
Returns:
A list of input segments' parts (in the same order as input):
[
[[x1,y1, x2,y2 ...], ...], # input segment #0 parts
mask1, # input segment #1 mask (if source segment is mask)
[], # when source segment is too small
...
]
"""
from pycocotools import mask as mask_utils
segments = [[s] for s in segments]
input_rles = [mask_utils.frPyObjects(s, height, width) for s in segments]
for i, rle_bottom in enumerate(input_rles):
area_bottom = sum(mask_utils.area(rle_bottom))
if area_bottom < area_threshold:
segments[i] = [] if not return_masks else None
continue
rles_top = []
for j in range(i + 1, len(input_rles)):
rle_top = input_rles[j]
iou = sum(mask_utils.iou(rle_bottom, rle_top, [0, 0]))[0]
if iou <= iou_threshold:
continue
area_top = sum(mask_utils.area(rle_top))
area_ratio = area_top / area_bottom
# If a segment is fully inside another one, skip this segment
if abs(area_ratio - iou) < ratio_tolerance:
continue
# Check if the bottom segment is fully covered by the top one.
# There is a mistake in the annotation, keep the background one
if abs(1 / area_ratio - iou) < ratio_tolerance:
rles_top = []
break
rles_top += rle_top
if not rles_top and not isinstance(segments[i][0], dict) \
and not return_masks:
continue
rle_bottom = rle_bottom[0]
bottom_mask = mask_utils.decode(rle_bottom).astype(np.uint8)
if rles_top:
rle_top = mask_utils.merge(rles_top)
top_mask = mask_utils.decode(rle_top).astype(np.uint8)
bottom_mask -= top_mask
bottom_mask[bottom_mask != 1] = 0
if not return_masks and not isinstance(segments[i][0], dict):
segments[i] = mask_to_polygons(bottom_mask,
area_threshold=area_threshold)
else:
segments[i] = bottom_mask
return segments
def rles_to_mask(rles, width, height):
from pycocotools import mask as mask_utils
rles = mask_utils.frPyObjects(rles, height, width)
rles = mask_utils.merge(rles)
mask = mask_utils.decode(rles)
return mask

@ -6,5 +6,6 @@ opencv-python>=4.1.0.25
Pillow>=6.1.0
pycocotools>=2.0.0
PyYAML>=5.1.1
scikit-image>=0.15.0
tensorboardX>=1.8
tensorflow>=1.12.0
tensorflow>=1.12.0

@ -56,6 +56,7 @@ setuptools.setup(
'Pillow',
'PyYAML',
'pycocotools',
'scikit-image',
'tensorboardX',
'tensorflow',
],
@ -64,4 +65,4 @@ setuptools.setup(
'datum=datumaro:main',
],
},
)
)

@ -160,28 +160,33 @@ class CocoConverterTest(TestCase):
self.assertFalse(item_b is None)
self.assertEqual(len(item_a.annotations), len(item_b.annotations))
for ann_a in item_a.annotations:
ann_b = find(item_b.annotations, lambda x: \
x.id == ann_a.id and \
x.type == ann_a.type and x.group == ann_a.group)
self.assertEqual(ann_a, ann_b, 'id: ' + str(ann_a.id))
# We might find few corresponding items, so check them all
ann_b_matches = [x for x in item_b.annotations
if x.id == ann_a.id and \
x.type == ann_a.type and x.group == ann_a.group]
self.assertFalse(len(ann_b_matches) == 0, 'aid: %s' % ann_a.id)
ann_b = find(ann_b_matches, lambda x: x == ann_a)
self.assertEqual(ann_a, ann_b, 'aid: %s' % ann_a.id)
item_b.annotations.remove(ann_b) # avoid repeats
def test_can_save_and_load_captions(self):
class TestExtractor(Extractor):
def __iter__(self):
return iter([
DatasetItem(id=0, subset='train',
DatasetItem(id=1, subset='train',
annotations=[
CaptionObject('hello', id=1),
CaptionObject('world', id=2),
CaptionObject('hello', id=1, group=1),
CaptionObject('world', id=2, group=2),
]),
DatasetItem(id=1, subset='train',
DatasetItem(id=2, subset='train',
annotations=[
CaptionObject('test', id=3),
CaptionObject('test', id=3, group=3),
]),
DatasetItem(id=2, subset='val',
DatasetItem(id=3, subset='val',
annotations=[
CaptionObject('word', id=1),
CaptionObject('word', id=1, group=1),
]
),
])
@ -191,95 +196,185 @@ class CocoConverterTest(TestCase):
CocoCaptionsConverter(), test_dir)
def test_can_save_and_load_instances(self):
label_categories = LabelCategories()
for i in range(10):
label_categories.add(str(i))
categories = { AnnotationType.label: label_categories }
class TestExtractor(Extractor):
def __iter__(self):
return iter([
DatasetItem(id=0, subset='train', image=np.ones((4, 4, 3)),
DatasetItem(id=1, subset='train', image=np.ones((4, 4, 3)),
annotations=[
# Bbox + single polygon
BboxObject(0, 1, 2, 3, label=2, group=1, id=1,
BboxObject(0, 1, 2, 2,
label=2, group=1, id=1,
attributes={ 'is_crowd': False }),
PolygonObject([0, 1, 2, 1, 2, 3, 0, 3],
attributes={ 'is_crowd': False },
label=2, group=1, id=1),
]),
DatasetItem(id=1, subset='train',
DatasetItem(id=2, subset='train', image=np.ones((4, 4, 3)),
annotations=[
# Mask + bbox
MaskObject(np.array([[0, 0, 0, 0], [1, 0, 1, 0],
[1, 1, 0, 0], [0, 0, 1, 0]],
dtype=np.bool),
MaskObject(np.array([
[0, 1, 0, 0],
[0, 1, 0, 0],
[0, 1, 1, 1],
[0, 0, 0, 0]],
),
attributes={ 'is_crowd': True },
label=4, group=3, id=3),
BboxObject(0, 1, 3, 3, label=4, group=3, id=3,
BboxObject(1, 0, 2, 2, label=4, group=3, id=3,
attributes={ 'is_crowd': True }),
]),
DatasetItem(id=3, subset='val',
DatasetItem(id=3, subset='val', image=np.ones((4, 4, 3)),
annotations=[
# Bbox + mask
BboxObject(0, 1, 3, 2, label=4, group=3, id=3,
BboxObject(0, 1, 2, 2, label=4, group=3, id=3,
attributes={ 'is_crowd': True }),
MaskObject(np.array([[0, 0, 0, 0], [1, 0, 1, 0],
[1, 1, 0, 0], [0, 0, 0, 0]],
dtype=np.bool),
MaskObject(np.array([
[0, 0, 0, 0],
[1, 1, 1, 0],
[1, 1, 0, 0],
[0, 0, 0, 0]],
),
attributes={ 'is_crowd': True },
label=4, group=3, id=3),
]),
])
def categories(self):
label_categories = LabelCategories()
for i in range(10):
label_categories.add(str(i))
return {
AnnotationType.label: label_categories,
}
return categories
with TestDir() as test_dir:
self._test_save_and_load(TestExtractor(),
CocoInstancesConverter(), test_dir)
def test_can_save_and_load_instances_with_mask_conversion(self):
def test_can_merge_polygons_on_loading(self):
label_categories = LabelCategories()
for i in range(10):
label_categories.add(str(i))
categories = { AnnotationType.label: label_categories }
class TestExtractor(Extractor):
def __iter__(self):
return iter([
DatasetItem(id=0, image=np.zeros((5, 5, 3)), subset='train',
DatasetItem(id=1, image=np.zeros((6, 10, 3)),
annotations=[
BboxObject(0, 0, 5, 5, label=3, id=4, group=4,
attributes={ 'is_crowd': False }),
PolygonObject([0, 0, 4, 0, 4, 4],
label=3, id=4, group=4,
attributes={ 'is_crowd': False }),
MaskObject(np.array([
[0, 1, 1, 1, 0],
[0, 0, 1, 1, 0],
[0, 0, 0, 1, 0],
[0, 0, 0, 0, 0],
[0, 0, 0, 0, 0]],
# only internal fragment (without the border),
# but not everywhere...
dtype=np.bool),
attributes={ 'is_crowd': False },
label=3, id=4, group=4),
PolygonObject([5, 0, 9, 0, 5, 5],
label=3, id=4, group=4),
]
),
])
def categories(self):
label_categories = LabelCategories()
for i in range(10):
label_categories.add(str(i))
return {
AnnotationType.label: label_categories,
}
return categories
class TargetExtractor(TestExtractor):
def __iter__(self):
items = list(super().__iter__())
items[0]._annotations = [
BboxObject(0, 0, 9, 5,
label=3, id=4, group=4,
attributes={ 'is_crowd': False }),
PolygonObject([0, 0, 4, 0, 4, 4],
label=3, id=4, group=4,
attributes={ 'is_crowd': False }),
PolygonObject([5, 0, 9, 0, 5, 5],
label=3, id=4, group=4,
attributes={ 'is_crowd': False }),
MaskObject(np.array([
[0, 1, 1, 1, 0, 1, 1, 1, 1, 0],
[0, 0, 1, 1, 0, 1, 1, 1, 0, 0],
[0, 0, 0, 1, 0, 1, 1, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]],
# only internal fragment (without the border),
# but not everywhere...
),
label=3, id=4, group=4,
attributes={ 'is_crowd': False }),
]
return iter(items)
with TestDir() as test_dir:
self._test_save_and_load(TestExtractor(),
CocoInstancesConverter(), test_dir,
{'merge_instance_polygons': True})
importer_params={'merge_instance_polygons': True},
target_dataset=TargetExtractor())
def test_can_crop_covered_segments(self):
label_categories = LabelCategories()
for i in range(10):
label_categories.add(str(i))
class SrcTestExtractor(Extractor):
def __iter__(self):
items = [
DatasetItem(id=1, image=np.zeros((5, 5, 3)),
annotations=[
MaskObject(np.array([
[0, 0, 1, 1, 1],
[0, 0, 1, 1, 1],
[1, 1, 0, 1, 1],
[1, 1, 1, 0, 0],
[1, 1, 1, 0, 0]],
),
label=2, id=1, z_order=0),
PolygonObject([1, 1, 4, 1, 4, 4, 1, 4],
label=1, id=2, z_order=1),
]
),
]
return iter(items)
def categories(self):
return { AnnotationType.label: label_categories }
class DstTestExtractor(Extractor):
def __iter__(self):
items = [
DatasetItem(id=1, image=np.zeros((5, 5, 3)),
annotations=[
BboxObject(0, 0, 4, 4,
label=2, id=1, group=1,
attributes={ 'is_crowd': True }),
MaskObject(np.array([
[0, 0, 1, 1, 1],
[0, 0, 0, 0, 1],
[1, 0, 0, 0, 1],
[1, 0, 0, 0, 0],
[1, 1, 1, 0, 0]],
),
attributes={ 'is_crowd': True },
label=2, id=1, group=1),
BboxObject(1, 1, 3, 3,
label=1, id=2, group=2,
attributes={ 'is_crowd': False }),
PolygonObject([1, 1, 4, 1, 4, 4, 1, 4],
label=1, id=2, group=2,
attributes={ 'is_crowd': False }),
# NOTE: Why it's 4 in COCOapi?..
]
),
]
return iter(items)
def test_can_merge_instance_polygons_to_mask_in_coverter(self):
def categories(self):
return { AnnotationType.label: label_categories }
with TestDir() as test_dir:
self._test_save_and_load(SrcTestExtractor(),
CocoInstancesConverter(crop_covered=True), test_dir,
target_dataset=DstTestExtractor())
def test_can_convert_polygons_to_mask(self):
label_categories = LabelCategories()
for i in range(10):
label_categories.add(str(i))
@ -287,14 +382,12 @@ class CocoConverterTest(TestCase):
class SrcTestExtractor(Extractor):
def __iter__(self):
return iter([
DatasetItem(id=0, image=np.zeros((5, 10, 3)),
DatasetItem(id=1, image=np.zeros((6, 10, 3)),
annotations=[
PolygonObject([0, 0, 4, 0, 4, 4],
label=3, id=4, group=4,
attributes={ 'is_crowd': False }),
label=3, id=4, group=4),
PolygonObject([5, 0, 9, 0, 5, 5],
label=3, id=4, group=4,
attributes={ 'is_crowd': False }),
label=3, id=4, group=4),
]
),
])
@ -305,19 +398,20 @@ class CocoConverterTest(TestCase):
class DstTestExtractor(Extractor):
def __iter__(self):
return iter([
DatasetItem(id=0, image=np.zeros((5, 10, 3)),
DatasetItem(id=1, image=np.zeros((6, 10, 3)),
annotations=[
BboxObject(1, 0, 8, 4, label=3, id=4, group=4,
BboxObject(0, 0, 9, 5, label=3, id=4, group=4,
attributes={ 'is_crowd': True }),
MaskObject(np.array([
[0, 1, 1, 1, 0, 1, 1, 1, 1, 0],
[0, 0, 1, 1, 0, 1, 1, 1, 0, 0],
[0, 0, 0, 1, 0, 1, 1, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]],
# only internal fragment (without the border),
# but not everywhere...
dtype=np.bool),
),
attributes={ 'is_crowd': True },
label=3, id=4, group=4),
]
@ -329,15 +423,69 @@ class CocoConverterTest(TestCase):
with TestDir() as test_dir:
self._test_save_and_load(SrcTestExtractor(),
CocoInstancesConverter(merge_polygons=True), test_dir,
CocoInstancesConverter(segmentation_mode='mask'), test_dir,
target_dataset=DstTestExtractor())
def test_can_convert_masks_to_polygons(self):
label_categories = LabelCategories()
for i in range(10):
label_categories.add(str(i))
class SrcTestExtractor(Extractor):
def __iter__(self):
items = [
DatasetItem(id=1, image=np.zeros((5, 10, 3)),
annotations=[
MaskObject(np.array([
[0, 1, 1, 1, 0, 1, 1, 1, 1, 0],
[0, 0, 1, 1, 0, 1, 1, 1, 0, 0],
[0, 0, 0, 1, 0, 1, 1, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
],
),
label=3, id=4, group=4),
]
),
]
return iter(items)
def categories(self):
return { AnnotationType.label: label_categories }
class DstTestExtractor(Extractor):
def __iter__(self):
return iter([
DatasetItem(id=1, image=np.zeros((5, 10, 3)),
annotations=[
BboxObject(1, 0, 7, 3, label=3, id=4, group=4,
attributes={ 'is_crowd': False }),
PolygonObject(
[3.0, 2.5, 1.0, 0.0, 3.5, 0.0, 3.0, 2.5],
label=3, id=4, group=4,
attributes={ 'is_crowd': False }),
PolygonObject(
[5.0, 3.5, 4.5, 0.0, 8.0, 0.0, 5.0, 3.5],
label=3, id=4, group=4,
attributes={ 'is_crowd': False }),
]
),
])
def categories(self):
return { AnnotationType.label: label_categories }
with TestDir() as test_dir:
self._test_save_and_load(SrcTestExtractor(),
CocoInstancesConverter(segmentation_mode='polygons'), test_dir,
target_dataset=DstTestExtractor())
def test_can_save_and_load_images(self):
class TestExtractor(Extractor):
def __iter__(self):
return iter([
DatasetItem(id=0, subset='train'),
DatasetItem(id=1, subset='train'),
DatasetItem(id=2, subset='train'),
DatasetItem(id=2, subset='val'),
DatasetItem(id=3, subset='val'),
@ -354,19 +502,19 @@ class CocoConverterTest(TestCase):
class TestExtractor(Extractor):
def __iter__(self):
return iter([
DatasetItem(id=0, subset='train',
DatasetItem(id=1, subset='train',
annotations=[
LabelObject(4, id=1),
LabelObject(9, id=2),
LabelObject(4, id=1, group=1),
LabelObject(9, id=2, group=2),
]),
DatasetItem(id=1, subset='train',
DatasetItem(id=2, subset='train',
annotations=[
LabelObject(4, id=4),
LabelObject(4, id=4, group=4),
]),
DatasetItem(id=2, subset='val',
DatasetItem(id=3, subset='val',
annotations=[
LabelObject(2, id=1),
LabelObject(2, id=1, group=1),
]),
])
@ -383,62 +531,114 @@ class CocoConverterTest(TestCase):
CocoLabelsConverter(), test_dir)
def test_can_save_and_load_keypoints(self):
label_categories = LabelCategories()
points_categories = PointsCategories()
for i in range(10):
label_categories.add(str(i))
points_categories.add(i, [])
categories = {
AnnotationType.label: label_categories,
AnnotationType.points: points_categories,
}
class TestExtractor(Extractor):
def __iter__(self):
return iter([
DatasetItem(id=0, subset='train',
DatasetItem(id=1, subset='train', image=np.zeros((5, 5, 3)),
annotations=[
PointsObject([1, 2, 0, 2, 4, 1], [0, 1, 2],
# Full instance annotations: polygon + keypoints
PointsObject([0, 0, 0, 2, 4, 1], [0, 1, 2],
label=3, group=1, id=1),
PolygonObject([0, 0, 4, 0, 4, 4],
label=3, group=1, id=1),
BboxObject(1, 2, 3, 4, label=3, group=1),
PointsObject([5, 6, 0, 7], group=2, id=2),
BboxObject(1, 2, 3, 4, group=2),
# Full instance annotations: bbox + keypoints
PointsObject([1, 2, 3, 4, 2, 3], group=2, id=2),
BboxObject(1, 2, 2, 2, group=2, id=2),
]),
DatasetItem(id=1, subset='train',
DatasetItem(id=2, subset='train',
annotations=[
PointsObject([1, 2, 0, 2, 4, 1], label=5,
group=3, id=3),
BboxObject(1, 2, 3, 4, label=5, group=3),
# Solitary keypoints
PointsObject([1, 2, 0, 2, 4, 1], label=5, id=3),
]),
DatasetItem(id=2, subset='val',
DatasetItem(id=3, subset='val',
annotations=[
PointsObject([0, 2, 0, 2, 4, 1], label=2,
group=3, id=3),
BboxObject(0, 2, 4, 4, label=2, group=3),
# Solitary keypoints with no label
PointsObject([0, 0, 1, 2, 3, 4], [0, 1, 2], id=3),
]),
])
def categories(self):
label_categories = LabelCategories()
points_categories = PointsCategories()
for i in range(10):
label_categories.add(str(i))
points_categories.add(i, [])
return categories
return {
AnnotationType.label: label_categories,
AnnotationType.points: points_categories,
}
class DstTestExtractor(TestExtractor):
def __iter__(self):
return iter([
DatasetItem(id=1, subset='train', image=np.zeros((5, 5, 3)),
annotations=[
PointsObject([0, 0, 0, 2, 4, 1], [0, 1, 2],
label=3, group=1, id=1,
attributes={'is_crowd': False}),
PolygonObject([0, 0, 4, 0, 4, 4],
label=3, group=1, id=1,
attributes={'is_crowd': False}),
BboxObject(0, 0, 4, 4,
label=3, group=1, id=1,
attributes={'is_crowd': False}),
PointsObject([1, 2, 3, 4, 2, 3],
group=2, id=2,
attributes={'is_crowd': False}),
PolygonObject([1, 2, 3, 2, 3, 4, 1, 4],
group=2, id=2,
attributes={'is_crowd': False}),
BboxObject(1, 2, 2, 2,
group=2, id=2,
attributes={'is_crowd': False}),
]),
DatasetItem(id=2, subset='train',
annotations=[
PointsObject([1, 2, 0, 2, 4, 1],
label=5, group=3, id=3,
attributes={'is_crowd': False}),
PolygonObject([0, 1, 4, 1, 4, 2, 0, 2],
label=5, group=3, id=3,
attributes={'is_crowd': False}),
BboxObject(0, 1, 4, 1,
label=5, group=3, id=3,
attributes={'is_crowd': False}),
]),
DatasetItem(id=3, subset='val',
annotations=[
PointsObject([0, 0, 1, 2, 3, 4], [0, 1, 2],
group=3, id=3,
attributes={'is_crowd': False}),
PolygonObject([1, 2, 3, 2, 3, 4, 1, 4],
group=3, id=3,
attributes={'is_crowd': False}),
BboxObject(1, 2, 2, 2,
group=3, id=3,
attributes={'is_crowd': False}),
]),
])
with TestDir() as test_dir:
self._test_save_and_load(TestExtractor(),
CocoPersonKeypointsConverter(), test_dir)
CocoPersonKeypointsConverter(), test_dir,
target_dataset=DstTestExtractor())
def test_can_save_dataset_with_no_subsets(self):
class TestExtractor(Extractor):
def __iter__(self):
return iter([
DatasetItem(id=1, annotations=[
LabelObject(2, id=1),
LabelObject(2, id=1, group=1),
]),
DatasetItem(id=2, image=np.zeros((5, 5, 3)), annotations=[
LabelObject(3, id=3),
BboxObject(0, 0, 5, 5, label=3, id=4, group=4,
attributes={ 'is_crowd': False }),
PolygonObject([0, 0, 4, 0, 4, 4], label=3, id=4, group=4,
attributes={ 'is_crowd': False }),
DatasetItem(id=2, annotations=[
LabelObject(3, id=2, group=2),
]),
])

@ -0,0 +1,69 @@
import numpy as np
from unittest import TestCase
import datumaro.util.mask_tools as mask_tools
class PolygonConversionsTest(TestCase):
def test_mask_can_be_converted_to_polygon(self):
mask = np.array([
[0, 1, 1, 1, 0, 1, 1, 1, 1, 0],
[0, 0, 1, 1, 0, 1, 0, 1, 0, 0],
[0, 0, 0, 1, 0, 1, 1, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
])
expected = [
[1, 0, 3, 0, 3, 2, 1, 0],
[5, 0, 8, 0, 5, 3],
]
computed = mask_tools.mask_to_polygons(mask)
self.assertEqual(len(expected), len(computed))
def test_can_crop_covered_segments(self):
image_size = [7, 7]
initial = [
[1, 1, 6, 1, 6, 6, 1, 6], # rectangle
mask_tools.mask_to_rle(np.array([
[0, 0, 0, 0, 0, 0, 0],
[0, 0, 1, 0, 1, 1, 0],
[0, 1, 1, 0, 1, 1, 0],
[0, 0, 0, 0, 0, 1, 0],
[0, 1, 1, 0, 0, 1, 0],
[0, 1, 1, 1, 1, 1, 0],
[0, 0, 0, 0, 0, 0, 0],
])),
[1, 1, 6, 6, 1, 6], # lower-left triangle
]
expected = [
np.array([
[0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 1, 0, 0, 0],
[0, 0, 0, 1, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0],
]), # half-covered
np.array([
[0, 0, 0, 0, 0, 0, 0],
[0, 0, 1, 0, 1, 1, 0],
[0, 0, 0, 0, 1, 1, 0],
[0, 0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0],
]), # half-covered
mask_tools.rles_to_mask([initial[2]], *image_size), # unchanged
]
computed = mask_tools.crop_covered_segments(initial, *image_size,
ratio_tolerance=0, return_masks=True)
self.assertEqual(len(initial), len(computed))
for i, (e_mask, c_mask) in enumerate(zip(expected, computed)):
self.assertTrue(np.array_equal(e_mask, c_mask),
'#%s: %s\n%s\n' % (i, e_mask, c_mask))
Loading…
Cancel
Save