Move annotation formats to dataset manager (#1256)
* Move formats to dataset manager * Unify datataset export and anno export implementations * Add track_id to TrackedShape, export tracked shapes * Replace MOT format * Replace LabelMe format * Add new formats to dm * Add dm tests * Extend TrackedShape * Enable dm test in CI * Fix tests * Add import * Fix tests * Fix mot track ids * Fix mot format * Update attribute logic in labelme tests * Use common code in yolo * Put datumaro in path in settings * Expect labels file in MOT next to annotations file * Add MOT format description * Add import * Add labelme format description * Linter fix * Linter fix2 * Compare attributes ordered * Update docs * Update testsmain
parent
e87ec38476
commit
887c6f0432
@ -1,307 +0,0 @@
|
||||
# Copyright (C) 2019 Intel Corporation
|
||||
#
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
format_spec = {
|
||||
"name": "LabelMe",
|
||||
"dumpers": [
|
||||
{
|
||||
"display_name": "{name} {format} {version} for images",
|
||||
"format": "ZIP",
|
||||
"version": "3.0",
|
||||
"handler": "dump_as_labelme_annotation"
|
||||
}
|
||||
],
|
||||
"loaders": [
|
||||
{
|
||||
"display_name": "{name} {format} {version}",
|
||||
"format": "ZIP",
|
||||
"version": "3.0",
|
||||
"handler": "load",
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
_DEFAULT_USERNAME = 'cvat'
|
||||
_MASKS_DIR = 'Masks'
|
||||
|
||||
|
||||
def dump_frame_anno(frame_annotation):
|
||||
from collections import defaultdict
|
||||
from lxml import etree as ET
|
||||
|
||||
root_elem = ET.Element('annotation')
|
||||
|
||||
ET.SubElement(root_elem, 'filename').text = frame_annotation.name
|
||||
ET.SubElement(root_elem, 'folder').text = ''
|
||||
|
||||
source_elem = ET.SubElement(root_elem, 'source')
|
||||
ET.SubElement(source_elem, 'sourceImage').text = ''
|
||||
ET.SubElement(source_elem, 'sourceAnnotation').text = 'CVAT'
|
||||
|
||||
image_elem = ET.SubElement(root_elem, 'imagesize')
|
||||
ET.SubElement(image_elem, 'nrows').text = str(frame_annotation.height)
|
||||
ET.SubElement(image_elem, 'ncols').text = str(frame_annotation.width)
|
||||
|
||||
groups = defaultdict(list)
|
||||
|
||||
for obj_id, shape in enumerate(frame_annotation.labeled_shapes):
|
||||
obj_elem = ET.SubElement(root_elem, 'object')
|
||||
ET.SubElement(obj_elem, 'name').text = str(shape.label)
|
||||
ET.SubElement(obj_elem, 'deleted').text = '0'
|
||||
ET.SubElement(obj_elem, 'verified').text = '0'
|
||||
ET.SubElement(obj_elem, 'occluded').text = \
|
||||
'yes' if shape.occluded else 'no'
|
||||
ET.SubElement(obj_elem, 'date').text = ''
|
||||
ET.SubElement(obj_elem, 'id').text = str(obj_id)
|
||||
|
||||
parts_elem = ET.SubElement(obj_elem, 'parts')
|
||||
if shape.group:
|
||||
groups[shape.group].append((obj_id, parts_elem))
|
||||
else:
|
||||
ET.SubElement(parts_elem, 'hasparts').text = ''
|
||||
ET.SubElement(parts_elem, 'ispartof').text = ''
|
||||
|
||||
if shape.type == 'rectangle':
|
||||
ET.SubElement(obj_elem, 'type').text = 'bounding_box'
|
||||
|
||||
poly_elem = ET.SubElement(obj_elem, 'polygon')
|
||||
x0, y0, x1, y1 = shape.points
|
||||
points = [ (x0, y0), (x1, y0), (x1, y1), (x0, y1) ]
|
||||
for x, y in points:
|
||||
point_elem = ET.SubElement(poly_elem, 'pt')
|
||||
ET.SubElement(point_elem, 'x').text = '%.2f' % x
|
||||
ET.SubElement(point_elem, 'y').text = '%.2f' % y
|
||||
|
||||
ET.SubElement(poly_elem, 'username').text = _DEFAULT_USERNAME
|
||||
elif shape.type == 'polygon':
|
||||
poly_elem = ET.SubElement(obj_elem, 'polygon')
|
||||
for x, y in zip(shape.points[::2], shape.points[1::2]):
|
||||
point_elem = ET.SubElement(poly_elem, 'pt')
|
||||
ET.SubElement(point_elem, 'x').text = '%.2f' % x
|
||||
ET.SubElement(point_elem, 'y').text = '%.2f' % y
|
||||
|
||||
ET.SubElement(poly_elem, 'username').text = _DEFAULT_USERNAME
|
||||
elif shape.type == 'polyline':
|
||||
pass
|
||||
elif shape.type == 'points':
|
||||
pass
|
||||
else:
|
||||
raise NotImplementedError("Unknown shape type '%s'" % shape.type)
|
||||
|
||||
attrs = ['%s=%s' % (a.name, a.value) for a in shape.attributes]
|
||||
ET.SubElement(obj_elem, 'attributes').text = ', '.join(attrs)
|
||||
|
||||
for _, group in groups.items():
|
||||
leader_id, leader_parts_elem = group[0]
|
||||
leader_parts = [str(o_id) for o_id, _ in group[1:]]
|
||||
ET.SubElement(leader_parts_elem, 'hasparts').text = \
|
||||
','.join(leader_parts)
|
||||
ET.SubElement(leader_parts_elem, 'ispartof').text = ''
|
||||
|
||||
for obj_id, parts_elem in group[1:]:
|
||||
ET.SubElement(parts_elem, 'hasparts').text = ''
|
||||
ET.SubElement(parts_elem, 'ispartof').text = str(leader_id)
|
||||
|
||||
return ET.tostring(root_elem, encoding='unicode', pretty_print=True)
|
||||
|
||||
def dump_as_labelme_annotation(file_object, annotations):
|
||||
import os.path as osp
|
||||
from zipfile import ZipFile, ZIP_DEFLATED
|
||||
|
||||
with ZipFile(file_object, 'w', compression=ZIP_DEFLATED) as output_zip:
|
||||
for frame_annotation in annotations.group_by_frame():
|
||||
xml_data = dump_frame_anno(frame_annotation)
|
||||
filename = osp.splitext(frame_annotation.name)[0] + '.xml'
|
||||
output_zip.writestr(filename, xml_data)
|
||||
|
||||
def parse_xml_annotations(xml_data, annotations, input_zip):
|
||||
from datumaro.util.mask_tools import mask_to_polygons
|
||||
from io import BytesIO
|
||||
from lxml import etree as ET
|
||||
import numpy as np
|
||||
import os.path as osp
|
||||
from PIL import Image
|
||||
|
||||
def parse_attributes(attributes_string):
|
||||
parsed = []
|
||||
if not attributes_string:
|
||||
return parsed
|
||||
|
||||
read = attributes_string.split(',')
|
||||
read = [a.strip() for a in read if a.strip()]
|
||||
for attr in read:
|
||||
if '=' in attr:
|
||||
name, value = attr.split('=', maxsplit=1)
|
||||
parsed.append(annotations.Attribute(name, value))
|
||||
else:
|
||||
parsed.append(annotations.Attribute(attr, '1'))
|
||||
|
||||
return parsed
|
||||
|
||||
|
||||
root_elem = ET.fromstring(xml_data)
|
||||
|
||||
frame_number = annotations.match_frame(root_elem.find('filename').text)
|
||||
|
||||
parsed_annotations = dict()
|
||||
group_assignments = dict()
|
||||
root_annotations = set()
|
||||
for obj_elem in root_elem.iter('object'):
|
||||
obj_id = int(obj_elem.find('id').text)
|
||||
|
||||
ann_items = []
|
||||
|
||||
attributes = []
|
||||
attributes_elem = obj_elem.find('attributes')
|
||||
if attributes_elem is not None and attributes_elem.text:
|
||||
attributes = parse_attributes(attributes_elem.text)
|
||||
|
||||
occluded = False
|
||||
occluded_elem = obj_elem.find('occluded')
|
||||
if occluded_elem is not None and occluded_elem.text:
|
||||
occluded = (occluded_elem.text == 'yes')
|
||||
|
||||
deleted = False
|
||||
deleted_elem = obj_elem.find('deleted')
|
||||
if deleted_elem is not None and deleted_elem.text:
|
||||
deleted = bool(int(deleted_elem.text))
|
||||
|
||||
poly_elem = obj_elem.find('polygon')
|
||||
segm_elem = obj_elem.find('segm')
|
||||
type_elem = obj_elem.find('type') # the only value is 'bounding_box'
|
||||
if poly_elem is not None:
|
||||
points = []
|
||||
for point_elem in poly_elem.iter('pt'):
|
||||
x = float(point_elem.find('x').text)
|
||||
y = float(point_elem.find('y').text)
|
||||
points.append(x)
|
||||
points.append(y)
|
||||
label = obj_elem.find('name').text
|
||||
if label and attributes:
|
||||
label_id = annotations._get_label_id(label)
|
||||
if label_id:
|
||||
attributes = [a for a in attributes
|
||||
if annotations._get_attribute_id(label_id, a.name)
|
||||
]
|
||||
else:
|
||||
attributes = []
|
||||
else:
|
||||
attributes = []
|
||||
|
||||
if type_elem is not None and type_elem.text == 'bounding_box':
|
||||
xmin = min(points[::2])
|
||||
xmax = max(points[::2])
|
||||
ymin = min(points[1::2])
|
||||
ymax = max(points[1::2])
|
||||
ann_items.append(annotations.LabeledShape(
|
||||
type='rectangle',
|
||||
frame=frame_number,
|
||||
label=label,
|
||||
points=[xmin, ymin, xmax, ymax],
|
||||
occluded=occluded,
|
||||
attributes=attributes,
|
||||
))
|
||||
else:
|
||||
ann_items.append(annotations.LabeledShape(
|
||||
type='polygon',
|
||||
frame=frame_number,
|
||||
label=label,
|
||||
points=points,
|
||||
occluded=occluded,
|
||||
attributes=attributes,
|
||||
))
|
||||
elif segm_elem is not None:
|
||||
label = obj_elem.find('name').text
|
||||
if label and attributes:
|
||||
label_id = annotations._get_label_id(label)
|
||||
if label_id:
|
||||
attributes = [a for a in attributes
|
||||
if annotations._get_attribute_id(label_id, a.name)
|
||||
]
|
||||
else:
|
||||
attributes = []
|
||||
else:
|
||||
attributes = []
|
||||
|
||||
mask_file = segm_elem.find('mask').text
|
||||
mask = input_zip.read(osp.join(_MASKS_DIR, mask_file))
|
||||
mask = np.asarray(Image.open(BytesIO(mask)).convert('L'))
|
||||
mask = (mask != 0)
|
||||
polygons = mask_to_polygons(mask)
|
||||
|
||||
for polygon in polygons:
|
||||
ann_items.append(annotations.LabeledShape(
|
||||
type='polygon',
|
||||
frame=frame_number,
|
||||
label=label,
|
||||
points=polygon,
|
||||
occluded=occluded,
|
||||
attributes=attributes,
|
||||
))
|
||||
|
||||
if not deleted:
|
||||
parsed_annotations[obj_id] = ann_items
|
||||
|
||||
parts_elem = obj_elem.find('parts')
|
||||
if parts_elem is not None:
|
||||
children_ids = []
|
||||
hasparts_elem = parts_elem.find('hasparts')
|
||||
if hasparts_elem is not None and hasparts_elem.text:
|
||||
children_ids = [int(c) for c in hasparts_elem.text.split(',')]
|
||||
|
||||
parent_ids = []
|
||||
ispartof_elem = parts_elem.find('ispartof')
|
||||
if ispartof_elem is not None and ispartof_elem.text:
|
||||
parent_ids = [int(c) for c in ispartof_elem.text.split(',')]
|
||||
|
||||
if children_ids and not parent_ids and hasparts_elem.text:
|
||||
root_annotations.add(obj_id)
|
||||
group_assignments[obj_id] = [None, children_ids]
|
||||
|
||||
# assign a single group to the whole subtree
|
||||
current_group_id = 0
|
||||
annotations_to_visit = list(root_annotations)
|
||||
while annotations_to_visit:
|
||||
ann_id = annotations_to_visit.pop()
|
||||
ann_assignment = group_assignments[ann_id]
|
||||
group_id, children_ids = ann_assignment
|
||||
if group_id:
|
||||
continue
|
||||
|
||||
if ann_id in root_annotations:
|
||||
current_group_id += 1 # start a new group
|
||||
|
||||
group_id = current_group_id
|
||||
ann_assignment[0] = group_id
|
||||
|
||||
# continue with children
|
||||
annotations_to_visit.extend(children_ids)
|
||||
|
||||
assert current_group_id == len(root_annotations)
|
||||
|
||||
for ann_id, ann_items in parsed_annotations.items():
|
||||
group_id = 0
|
||||
if ann_id in group_assignments:
|
||||
ann_assignment = group_assignments[ann_id]
|
||||
group_id = ann_assignment[0]
|
||||
|
||||
for ann_item in ann_items:
|
||||
if group_id:
|
||||
ann_item = ann_item._replace(group=group_id)
|
||||
if isinstance(ann_item, annotations.LabeledShape):
|
||||
annotations.add_shape(ann_item)
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
|
||||
def load(file_object, annotations):
|
||||
from zipfile import ZipFile
|
||||
|
||||
with ZipFile(file_object, 'r') as input_zip:
|
||||
for filename in input_zip.namelist():
|
||||
if not filename.endswith('.xml'):
|
||||
continue
|
||||
|
||||
xml_data = input_zip.read(filename)
|
||||
parse_xml_annotations(xml_data, annotations, input_zip)
|
||||
@ -1,109 +0,0 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
format_spec = {
|
||||
"name": "MOT",
|
||||
"dumpers": [
|
||||
{
|
||||
"display_name": "{name} {format} {version}",
|
||||
"format": "CSV",
|
||||
"version": "1.0",
|
||||
"handler": "dump"
|
||||
},
|
||||
],
|
||||
"loaders": [
|
||||
{
|
||||
"display_name": "{name} {format} {version}",
|
||||
"format": "CSV",
|
||||
"version": "1.0",
|
||||
"handler": "load",
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
MOT = [
|
||||
"frame_id",
|
||||
"track_id",
|
||||
"xtl",
|
||||
"ytl",
|
||||
"width",
|
||||
"height",
|
||||
"confidence",
|
||||
"class_id",
|
||||
"visibility"
|
||||
]
|
||||
|
||||
|
||||
def dump(file_object, annotations):
|
||||
""" Export track shapes in MOT CSV format. Due to limitations of the MOT
|
||||
format, this process only supports rectangular interpolation mode
|
||||
annotations.
|
||||
"""
|
||||
import csv
|
||||
import io
|
||||
|
||||
# csv requires a text buffer
|
||||
with io.TextIOWrapper(file_object, encoding="utf-8") as csv_file:
|
||||
writer = csv.DictWriter(csv_file, fieldnames=MOT)
|
||||
for i, track in enumerate(annotations.tracks):
|
||||
for shape in track.shapes:
|
||||
# MOT doesn't support polygons or 'outside' property
|
||||
if shape.type != 'rectangle':
|
||||
continue
|
||||
writer.writerow({
|
||||
"frame_id": shape.frame,
|
||||
"track_id": i,
|
||||
"xtl": shape.points[0],
|
||||
"ytl": shape.points[1],
|
||||
"width": shape.points[2] - shape.points[0],
|
||||
"height": shape.points[3] - shape.points[1],
|
||||
"confidence": 1,
|
||||
"class_id": track.label,
|
||||
"visibility": 1 - int(shape.occluded)
|
||||
})
|
||||
|
||||
|
||||
def load(file_object, annotations):
|
||||
""" Read MOT CSV format and convert objects to annotated tracks.
|
||||
"""
|
||||
import csv
|
||||
import io
|
||||
tracks = {}
|
||||
# csv requires a text buffer
|
||||
with io.TextIOWrapper(file_object, encoding="utf-8") as csv_file:
|
||||
reader = csv.DictReader(csv_file, fieldnames=MOT)
|
||||
for row in reader:
|
||||
# create one shape per row
|
||||
xtl = float(row["xtl"])
|
||||
ytl = float(row["ytl"])
|
||||
xbr = xtl + float(row["width"])
|
||||
ybr = ytl + float(row["height"])
|
||||
shape = annotations.TrackedShape(
|
||||
type="rectangle",
|
||||
points=[xtl, ytl, xbr, ybr],
|
||||
occluded=float(row["visibility"]) == 0,
|
||||
outside=False,
|
||||
keyframe=False,
|
||||
z_order=0,
|
||||
frame=int(row["frame_id"]),
|
||||
attributes=[],
|
||||
)
|
||||
# build trajectories as lists of shapes in track dict
|
||||
track_id = int(row["track_id"])
|
||||
if track_id not in tracks:
|
||||
tracks[track_id] = annotations.Track(row["class_id"], track_id, [])
|
||||
tracks[track_id].shapes.append(shape)
|
||||
for track in tracks.values():
|
||||
# Set outside=True for the last shape since MOT has no support
|
||||
# for this flag
|
||||
last = annotations.TrackedShape(
|
||||
type=track.shapes[-1].type,
|
||||
points=track.shapes[-1].points,
|
||||
occluded=track.shapes[-1].occluded,
|
||||
outside=True,
|
||||
keyframe=track.shapes[-1].keyframe,
|
||||
z_order=track.shapes[-1].z_order,
|
||||
frame=track.shapes[-1].frame,
|
||||
attributes=track.shapes[-1].attributes,
|
||||
)
|
||||
track.shapes[-1] = last
|
||||
annotations.add_track(track)
|
||||
@ -0,0 +1,309 @@
|
||||
|
||||
# Copyright (C) 2020 Intel Corporation
|
||||
#
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
class _GitImportFix:
|
||||
import sys
|
||||
former_path = sys.path[:]
|
||||
|
||||
@classmethod
|
||||
def apply(cls):
|
||||
# HACK: fix application and module name clash
|
||||
# 'git' app is found earlier than a library in the path.
|
||||
# The clash is introduced by unittest discover
|
||||
import sys
|
||||
print('apply')
|
||||
|
||||
apps_dir = __file__[:__file__.rfind('/dataset_manager/')]
|
||||
assert 'apps' in apps_dir
|
||||
try:
|
||||
sys.path.remove(apps_dir)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
for name in list(sys.modules):
|
||||
if name.startswith('git.') or name == 'git':
|
||||
m = sys.modules.pop(name, None)
|
||||
del m
|
||||
|
||||
import git
|
||||
assert apps_dir not in git.__file__
|
||||
|
||||
@classmethod
|
||||
def restore(cls):
|
||||
import sys
|
||||
print('restore')
|
||||
|
||||
for name in list(sys.modules):
|
||||
if name.startswith('git.') or name == 'git':
|
||||
m = sys.modules.pop(name)
|
||||
del m
|
||||
|
||||
sys.path.insert(0, __file__[:__file__.rfind('/dataset_manager/')])
|
||||
|
||||
import importlib
|
||||
importlib.invalidate_caches()
|
||||
|
||||
def _setUpModule():
|
||||
_GitImportFix.apply()
|
||||
import cvat.apps.dataset_manager.task as dm
|
||||
from cvat.apps.engine.models import Task
|
||||
globals()['dm'] = dm
|
||||
globals()['Task'] = Task
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, __file__[:__file__.rfind('/dataset_manager/')])
|
||||
|
||||
def tearDownModule():
|
||||
_GitImportFix.restore()
|
||||
|
||||
from io import BytesIO
|
||||
import os
|
||||
import random
|
||||
import tempfile
|
||||
|
||||
from PIL import Image
|
||||
from django.contrib.auth.models import User, Group
|
||||
from rest_framework.test import APITestCase, APIClient
|
||||
from rest_framework import status
|
||||
|
||||
_setUpModule()
|
||||
|
||||
|
||||
def generate_image_file(filename):
|
||||
f = BytesIO()
|
||||
width = random.randint(10, 200)
|
||||
height = random.randint(10, 200)
|
||||
image = Image.new('RGB', size=(width, height))
|
||||
image.save(f, 'jpeg')
|
||||
f.name = filename
|
||||
f.seek(0)
|
||||
|
||||
return f
|
||||
|
||||
def create_db_users(cls):
|
||||
group_user, _ = Group.objects.get_or_create(name="user")
|
||||
|
||||
user_dummy = User.objects.create_superuser(username="test", password="test", email="")
|
||||
user_dummy.groups.add(group_user)
|
||||
|
||||
cls.user = user_dummy
|
||||
|
||||
class ForceLogin:
|
||||
def __init__(self, user, client):
|
||||
self.user = user
|
||||
self.client = client
|
||||
|
||||
def __enter__(self):
|
||||
if self.user:
|
||||
self.client.force_login(self.user,
|
||||
backend='django.contrib.auth.backends.ModelBackend')
|
||||
|
||||
return self
|
||||
|
||||
def __exit__(self, exception_type, exception_value, traceback):
|
||||
if self.user:
|
||||
self.client.logout()
|
||||
|
||||
class TaskExportTest(APITestCase):
|
||||
def setUp(self):
|
||||
self.client = APIClient()
|
||||
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
create_db_users(cls)
|
||||
|
||||
def _generate_task(self):
|
||||
task = {
|
||||
"name": "my task #1",
|
||||
"owner": '',
|
||||
"assignee": '',
|
||||
"overlap": 0,
|
||||
"segment_size": 100,
|
||||
"z_order": False,
|
||||
"labels": [
|
||||
{
|
||||
"name": "car",
|
||||
"attributes": [
|
||||
{
|
||||
"name": "model",
|
||||
"mutable": False,
|
||||
"input_type": "select",
|
||||
"default_value": "mazda",
|
||||
"values": ["bmw", "mazda", "renault"]
|
||||
},
|
||||
{
|
||||
"name": "parked",
|
||||
"mutable": True,
|
||||
"input_type": "checkbox",
|
||||
"default_value": False
|
||||
},
|
||||
]
|
||||
},
|
||||
{"name": "person"},
|
||||
]
|
||||
}
|
||||
task = self._create_task(task, 3)
|
||||
|
||||
annotations = {
|
||||
"version": 0,
|
||||
"tags": [
|
||||
{
|
||||
"frame": 0,
|
||||
"label_id": task["labels"][0]["id"],
|
||||
"group": None,
|
||||
"attributes": []
|
||||
}
|
||||
],
|
||||
"shapes": [
|
||||
{
|
||||
"frame": 0,
|
||||
"label_id": task["labels"][0]["id"],
|
||||
"group": None,
|
||||
"attributes": [
|
||||
{
|
||||
"spec_id": task["labels"][0]["attributes"][0]["id"],
|
||||
"value": task["labels"][0]["attributes"][0]["values"][0]
|
||||
},
|
||||
{
|
||||
"spec_id": task["labels"][0]["attributes"][1]["id"],
|
||||
"value": task["labels"][0]["attributes"][0]["default_value"]
|
||||
}
|
||||
],
|
||||
"points": [1.0, 2.1, 100, 300.222],
|
||||
"type": "rectangle",
|
||||
"occluded": False
|
||||
},
|
||||
{
|
||||
"frame": 1,
|
||||
"label_id": task["labels"][1]["id"],
|
||||
"group": None,
|
||||
"attributes": [],
|
||||
"points": [2.0, 2.1, 100, 300.222, 400, 500, 1, 3],
|
||||
"type": "polygon",
|
||||
"occluded": False
|
||||
},
|
||||
],
|
||||
"tracks": [
|
||||
{
|
||||
"frame": 0,
|
||||
"label_id": task["labels"][0]["id"],
|
||||
"group": None,
|
||||
"attributes": [
|
||||
{
|
||||
"spec_id": task["labels"][0]["attributes"][0]["id"],
|
||||
"value": task["labels"][0]["attributes"][0]["values"][0]
|
||||
},
|
||||
],
|
||||
"shapes": [
|
||||
{
|
||||
"frame": 0,
|
||||
"points": [1.0, 2.1, 100, 300.222],
|
||||
"type": "rectangle",
|
||||
"occluded": False,
|
||||
"outside": False,
|
||||
"attributes": [
|
||||
{
|
||||
"spec_id": task["labels"][0]["attributes"][1]["id"],
|
||||
"value": task["labels"][0]["attributes"][1]["default_value"]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"frame": 1,
|
||||
"attributes": [],
|
||||
"points": [2.0, 2.1, 100, 300.222],
|
||||
"type": "rectangle",
|
||||
"occluded": True,
|
||||
"outside": True
|
||||
},
|
||||
]
|
||||
},
|
||||
{
|
||||
"frame": 1,
|
||||
"label_id": task["labels"][1]["id"],
|
||||
"group": None,
|
||||
"attributes": [],
|
||||
"shapes": [
|
||||
{
|
||||
"frame": 1,
|
||||
"attributes": [],
|
||||
"points": [1.0, 2.1, 100, 300.222],
|
||||
"type": "rectangle",
|
||||
"occluded": False,
|
||||
"outside": False
|
||||
}
|
||||
]
|
||||
},
|
||||
]
|
||||
}
|
||||
self._put_api_v1_task_id_annotations(task["id"], annotations)
|
||||
|
||||
return task, annotations
|
||||
|
||||
def _create_task(self, data, size):
|
||||
with ForceLogin(self.user, self.client):
|
||||
response = self.client.post('/api/v1/tasks', data=data, format="json")
|
||||
assert response.status_code == status.HTTP_201_CREATED, response.status_code
|
||||
tid = response.data["id"]
|
||||
|
||||
images = {
|
||||
"client_files[%d]" % i: generate_image_file("image_%d.jpg" % i)
|
||||
for i in range(size)
|
||||
}
|
||||
images["image_quality"] = 75
|
||||
response = self.client.post("/api/v1/tasks/{}/data".format(tid), data=images)
|
||||
assert response.status_code == status.HTTP_202_ACCEPTED, response.status_code
|
||||
|
||||
response = self.client.get("/api/v1/tasks/{}".format(tid))
|
||||
task = response.data
|
||||
|
||||
return task
|
||||
|
||||
def _put_api_v1_task_id_annotations(self, tid, data):
|
||||
with ForceLogin(self.user, self.client):
|
||||
response = self.client.put("/api/v1/tasks/{}/annotations".format(tid),
|
||||
data=data, format="json")
|
||||
|
||||
return response
|
||||
|
||||
def _test_export(self, format_name, save_images=False):
|
||||
self.assertTrue(format_name in [f['tag'] for f in dm.EXPORT_FORMATS])
|
||||
|
||||
task, _ = self._generate_task()
|
||||
project = dm.TaskProject.from_task(
|
||||
Task.objects.get(pk=task["id"]), self.user.username)
|
||||
|
||||
with tempfile.TemporaryDirectory() as test_dir:
|
||||
project.export(format_name, test_dir, save_images=save_images)
|
||||
|
||||
self.assertTrue(os.listdir(test_dir))
|
||||
|
||||
def test_datumaro(self):
|
||||
self._test_export(dm.EXPORT_FORMAT_DATUMARO_PROJECT, save_images=False)
|
||||
|
||||
def test_coco(self):
|
||||
self._test_export('cvat_coco', save_images=True)
|
||||
|
||||
def test_voc(self):
|
||||
self._test_export('cvat_voc', save_images=True)
|
||||
|
||||
def test_tf_detection_api(self):
|
||||
self._test_export('cvat_tfrecord', save_images=True)
|
||||
|
||||
def test_yolo(self):
|
||||
self._test_export('cvat_yolo', save_images=True)
|
||||
|
||||
def test_mot(self):
|
||||
self._test_export('cvat_mot', save_images=True)
|
||||
|
||||
def test_labelme(self):
|
||||
self._test_export('cvat_label_me', save_images=True)
|
||||
|
||||
def test_formats_query(self):
|
||||
formats = dm.get_export_formats()
|
||||
|
||||
expected = set(f['tag'] for f in dm.EXPORT_FORMATS)
|
||||
actual = set(f['tag'] for f in formats)
|
||||
self.assertSetEqual(expected, actual)
|
||||
@ -0,0 +1,68 @@
|
||||
# Copyright (C) 2019 Intel Corporation
|
||||
#
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
format_spec = {
|
||||
"name": "LabelMe",
|
||||
"dumpers": [
|
||||
{
|
||||
"display_name": "{name} {format} {version}",
|
||||
"format": "ZIP",
|
||||
"version": "3.0",
|
||||
"handler": "dump"
|
||||
}
|
||||
],
|
||||
"loaders": [
|
||||
{
|
||||
"display_name": "{name} {format} {version}",
|
||||
"format": "ZIP",
|
||||
"version": "3.0",
|
||||
"handler": "load",
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
from datumaro.components.converter import Converter
|
||||
class CvatLabelMeConverter(Converter):
|
||||
def __init__(self, save_images=False):
|
||||
self._save_images = save_images
|
||||
|
||||
def __call__(self, extractor, save_dir):
|
||||
from datumaro.components.project import Environment, Dataset
|
||||
|
||||
env = Environment()
|
||||
id_from_image = env.transforms.get('id_from_image_name')
|
||||
|
||||
extractor = extractor.transform(id_from_image)
|
||||
extractor = Dataset.from_extractors(extractor) # apply lazy transforms
|
||||
|
||||
converter = env.make_converter('label_me', save_images=self._save_images)
|
||||
converter(extractor, save_dir=save_dir)
|
||||
|
||||
def dump(file_object, annotations):
|
||||
from cvat.apps.dataset_manager.bindings import CvatAnnotationsExtractor
|
||||
from cvat.apps.dataset_manager.util import make_zip_archive
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
extractor = CvatAnnotationsExtractor('', annotations)
|
||||
converter = CvatLabelMeConverter()
|
||||
with TemporaryDirectory() as temp_dir:
|
||||
converter(extractor, save_dir=temp_dir)
|
||||
make_zip_archive(temp_dir, file_object)
|
||||
|
||||
def load(file_object, annotations):
|
||||
from pyunpack import Archive
|
||||
from tempfile import TemporaryDirectory
|
||||
from datumaro.plugins.labelme_format import LabelMeImporter
|
||||
from datumaro.components.project import Environment
|
||||
from cvat.apps.dataset_manager.bindings import import_dm_annotations
|
||||
|
||||
archive_file = file_object if isinstance(file_object, str) else getattr(file_object, "name")
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
Archive(archive_file).extractall(tmp_dir)
|
||||
|
||||
dm_dataset = LabelMeImporter()(tmp_dir).make_dataset()
|
||||
masks_to_polygons = Environment().transforms.get('masks_to_polygons')
|
||||
dm_dataset = dm_dataset.transform(masks_to_polygons)
|
||||
import_dm_annotations(dm_dataset, annotations)
|
||||
@ -0,0 +1,89 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
format_spec = {
|
||||
"name": "MOT",
|
||||
"dumpers": [
|
||||
{
|
||||
"display_name": "{name} {format} {version}",
|
||||
"format": "ZIP",
|
||||
"version": "1.1",
|
||||
"handler": "dump"
|
||||
},
|
||||
],
|
||||
"loaders": [
|
||||
{
|
||||
"display_name": "{name} {format} {version}",
|
||||
"format": "ZIP",
|
||||
"version": "1.1",
|
||||
"handler": "load",
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
from datumaro.plugins.mot_format import \
|
||||
MotSeqGtConverter as _MotConverter
|
||||
class CvatMotConverter(_MotConverter):
|
||||
NAME = 'cvat_mot'
|
||||
|
||||
def dump(file_object, annotations):
|
||||
from cvat.apps.dataset_manager.bindings import CvatAnnotationsExtractor
|
||||
from cvat.apps.dataset_manager.util import make_zip_archive
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
extractor = CvatAnnotationsExtractor('', annotations)
|
||||
converter = CvatMotConverter()
|
||||
with TemporaryDirectory() as temp_dir:
|
||||
converter(extractor, save_dir=temp_dir)
|
||||
make_zip_archive(temp_dir, file_object)
|
||||
|
||||
|
||||
def load(file_object, annotations):
|
||||
from pyunpack import Archive
|
||||
from tempfile import TemporaryDirectory
|
||||
from datumaro.plugins.mot_format import MotSeqImporter
|
||||
import datumaro.components.extractor as datumaro
|
||||
from cvat.apps.dataset_manager.bindings import match_frame
|
||||
|
||||
archive_file = file_object if isinstance(file_object, str) else getattr(file_object, "name")
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
Archive(archive_file).extractall(tmp_dir)
|
||||
|
||||
tracks = {}
|
||||
|
||||
dm_dataset = MotSeqImporter()(tmp_dir).make_dataset()
|
||||
label_cat = dm_dataset.categories()[datumaro.AnnotationType.label]
|
||||
|
||||
for item in dm_dataset:
|
||||
frame_id = match_frame(item, annotations)
|
||||
|
||||
for ann in item.annotations:
|
||||
if ann.type != datumaro.AnnotationType.bbox:
|
||||
continue
|
||||
|
||||
track_id = ann.attributes.get('track_id')
|
||||
if track_id is None:
|
||||
continue
|
||||
|
||||
shape = annotations.TrackedShape(
|
||||
type='rectangle',
|
||||
points=ann.points,
|
||||
occluded=ann.attributes.get('occluded') == True,
|
||||
outside=False,
|
||||
keyframe=False,
|
||||
z_order=ann.z_order,
|
||||
frame=frame_id,
|
||||
attributes=[],
|
||||
)
|
||||
|
||||
# build trajectories as lists of shapes in track dict
|
||||
if track_id not in tracks:
|
||||
tracks[track_id] = annotations.Track(
|
||||
label_cat.items[ann.label].name, 0, [])
|
||||
tracks[track_id].shapes.append(shape)
|
||||
|
||||
for track in tracks.values():
|
||||
# MOT annotations do not require frames to be ordered
|
||||
track.shapes.sort(key=lambda t: t.frame)
|
||||
# Set outside=True for the last shape in a track to finish the track
|
||||
track.shapes[-1] = track.shapes[-1]._replace(outside=True)
|
||||
annotations.add_track(track)
|
||||
Loading…
Reference in New Issue