You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
190 lines
6.8 KiB
Python
190 lines
6.8 KiB
Python
# Copyright (C) 2021 Intel Corporation
|
|
#
|
|
# SPDX-License-Identifier: MIT
|
|
import os
|
|
import re
|
|
import hashlib
|
|
import mimetypes
|
|
import cv2 as cv
|
|
from av import VideoFrame
|
|
|
|
def rotate_image(image, angle):
|
|
height, width = image.shape[:2]
|
|
image_center = (width/2, height/2)
|
|
matrix = cv.getRotationMatrix2D(image_center, angle, 1.)
|
|
abs_cos = abs(matrix[0,0])
|
|
abs_sin = abs(matrix[0,1])
|
|
bound_w = int(height * abs_sin + width * abs_cos)
|
|
bound_h = int(height * abs_cos + width * abs_sin)
|
|
matrix[0, 2] += bound_w/2 - image_center[0]
|
|
matrix[1, 2] += bound_h/2 - image_center[1]
|
|
matrix = cv.warpAffine(image, matrix, (bound_w, bound_h))
|
|
return matrix
|
|
|
|
def md5_hash(frame):
|
|
if isinstance(frame, VideoFrame):
|
|
frame = frame.to_image()
|
|
return hashlib.md5(frame.tobytes()).hexdigest() # nosec
|
|
|
|
def _define_data_type(media):
|
|
return mimetypes.guess_type(media)[0]
|
|
|
|
def is_video(media_file):
|
|
data_type = _define_data_type(media_file)
|
|
return data_type is not None and data_type.startswith('video')
|
|
|
|
def is_image(media_file):
|
|
data_type = _define_data_type(media_file)
|
|
return data_type is not None and data_type.startswith('image') and \
|
|
not data_type.startswith('image/svg')
|
|
|
|
|
|
def _list_and_join(root):
|
|
files = os.listdir(root)
|
|
for f in files:
|
|
yield os.path.join(root, f)
|
|
|
|
def _prepare_context_list(files, base_dir):
|
|
return sorted(map(lambda x: os.path.relpath(x, base_dir), filter(is_image, files)))
|
|
|
|
# Expected 2D format is:
|
|
# data/
|
|
# 00001.png
|
|
# related_images/
|
|
# 00001_png/
|
|
# context_image_1.jpeg
|
|
# context_image_2.png
|
|
def _detect_related_images_2D(image_paths, root_path):
|
|
related_images = {}
|
|
latest_dirname = ''
|
|
related_images_exist = False
|
|
|
|
for image_path in sorted(image_paths):
|
|
rel_image_path = os.path.relpath(image_path, root_path)
|
|
dirname = os.path.dirname(image_path)
|
|
related_images_dirname = os.path.join(dirname, 'related_images')
|
|
related_images[rel_image_path] = []
|
|
|
|
if latest_dirname == dirname and not related_images_exist:
|
|
continue
|
|
elif latest_dirname != dirname:
|
|
# Update some data applicable for a subset of paths (within the current dirname)
|
|
latest_dirname = dirname
|
|
related_images_exist = os.path.isdir(related_images_dirname)
|
|
|
|
if related_images_exist:
|
|
related_images_dirname = os.path.join(
|
|
related_images_dirname, '_'.join(os.path.basename(image_path).rsplit('.', 1))
|
|
)
|
|
|
|
if os.path.isdir(related_images_dirname):
|
|
related_images[rel_image_path] = _prepare_context_list(_list_and_join(related_images_dirname), root_path)
|
|
return related_images
|
|
|
|
# Possible 3D formats are:
|
|
# velodyne_points/
|
|
# data/
|
|
# image_01.bin
|
|
# IMAGE_00 # any number?
|
|
# data/
|
|
# image_01.png
|
|
|
|
# pointcloud/
|
|
# 00001.pcd
|
|
# related_images/
|
|
# 00001_pcd/
|
|
# image_01.png # or other image
|
|
|
|
# Default formats
|
|
# Option 1
|
|
# data/
|
|
# image.pcd
|
|
# image.png
|
|
|
|
# Option 2
|
|
# data/
|
|
# image_1/
|
|
# image_1.pcd
|
|
# context_1.png
|
|
# context_2.jpg
|
|
def _detect_related_images_3D(image_paths, root_path):
|
|
related_images = {}
|
|
latest_dirname = ''
|
|
dirname_files = []
|
|
related_images_exist = False
|
|
velodyne_context_images_dirs = []
|
|
|
|
for image_path in sorted(image_paths):
|
|
rel_image_path = os.path.relpath(image_path, root_path)
|
|
name = os.path.splitext(os.path.basename(image_path))[0]
|
|
dirname = os.path.dirname(image_path)
|
|
related_images_dirname = os.path.normpath(os.path.join(dirname, '..', 'related_images'))
|
|
related_images[rel_image_path] = []
|
|
|
|
if latest_dirname != dirname:
|
|
# Update some data applicable for a subset of paths (within the current dirname)
|
|
latest_dirname = dirname
|
|
related_images_exist = os.path.isdir(related_images_dirname)
|
|
dirname_files = list(_list_and_join(dirname))
|
|
velodyne_context_images_dirs = [directory for directory
|
|
in _list_and_join(os.path.normpath(os.path.join(dirname, '..', '..')))
|
|
if os.path.isdir(os.path.join(directory, 'data')) and re.search(r'image_\d.*', directory, re.IGNORECASE)
|
|
]
|
|
|
|
filtered_dirname_files = list(filter(lambda x: x != image_path, dirname_files))
|
|
if len(filtered_dirname_files) and os.path.basename(dirname) == name:
|
|
# default format (option 2)
|
|
related_images[rel_image_path].extend(_prepare_context_list(filtered_dirname_files, root_path))
|
|
else:
|
|
filtered_dirname_files = list(
|
|
filter(lambda x: os.path.splitext(os.path.basename(x))[0] == name, filtered_dirname_files)
|
|
)
|
|
if len(filtered_dirname_files):
|
|
# default format (option 1)
|
|
related_images[rel_image_path].extend(_prepare_context_list(filtered_dirname_files, root_path))
|
|
|
|
if related_images_exist:
|
|
related_images_dirname = os.path.join(
|
|
related_images_dirname, '_'.join(os.path.basename(image_path).rsplit('.', 1))
|
|
)
|
|
if os.path.isdir(related_images_dirname):
|
|
related_images[rel_image_path].extend(
|
|
_prepare_context_list(_list_and_join(related_images_dirname), root_path)
|
|
)
|
|
|
|
if dirname.endswith(os.path.join('velodyne_points', 'data')):
|
|
# velodynepoints format
|
|
for context_images_dir in velodyne_context_images_dirs:
|
|
context_files = _list_and_join(os.path.join(context_images_dir, 'data'))
|
|
context_files = list(
|
|
filter(lambda x: os.path.splitext(os.path.basename(x))[0] == name, context_files)
|
|
)
|
|
related_images[rel_image_path].extend(
|
|
_prepare_context_list(context_files, root_path)
|
|
)
|
|
|
|
related_images[rel_image_path].sort()
|
|
return related_images
|
|
|
|
# This function is expected to be called only for images tasks
|
|
# image_path is expected to be a list of absolute path to images
|
|
# root_path is expected to be a string (dataset root)
|
|
def detect_related_images(image_paths, root_path):
|
|
data_are_2d = False
|
|
data_are_3d = False
|
|
|
|
# First of all need to define data type we are working with
|
|
for image_path in image_paths:
|
|
# .bin files are expected to be converted to .pcd before this code
|
|
if os.path.splitext(image_path)[1].lower() == '.pcd':
|
|
data_are_3d = True
|
|
else:
|
|
data_are_2d = True
|
|
assert not (data_are_3d and data_are_2d), 'Combined data types 2D and 3D are not supported'
|
|
|
|
if data_are_2d:
|
|
return _detect_related_images_2D(image_paths, root_path)
|
|
elif data_are_3d:
|
|
return _detect_related_images_3D(image_paths, root_path)
|
|
return {}
|