@ -8,13 +8,17 @@ import shutil
import zipfile
import io
import itertools
import struct
import re
from abc import ABC , abstractmethod
import av
import numpy as np
from pyunpack import Archive
from PIL import Image , ImageFile
import open3d as o3d
from cvat . apps . engine . utils import rotate_image
from cvat . apps . engine . models import DimensionType
# fixes: "OSError:broken data stream" when executing line 72 while loading images downloaded from the web
# see: https://stackoverflow.com/questions/42462431/oserror-broken-data-stream-when-reading-image-file
@ -179,7 +183,8 @@ class PdfReader(ImageListReader):
class ZipReader ( ImageListReader ) :
def __init__ ( self , source_path , step = 1 , start = 0 , stop = None ) :
self . _zip_source = zipfile . ZipFile ( source_path [ 0 ] , mode = ' r ' )
self . _dimension = DimensionType . DIM_2D
self . _zip_source = zipfile . ZipFile ( source_path [ 0 ] , mode = ' a ' )
self . extract_dir = source_path [ 1 ] if len ( source_path ) > 1 else None
file_list = [ f for f in self . _zip_source . namelist ( ) if get_mime ( f ) == ' image ' ]
super ( ) . __init__ ( file_list , step , start , stop )
@ -188,18 +193,42 @@ class ZipReader(ImageListReader):
self . _zip_source . close ( )
def get_preview ( self ) :
if self . _dimension == DimensionType . DIM_3D :
fp = open ( os . path . join ( os . path . dirname ( __file__ ) , ' assets/3d_preview.jpeg ' ) , " rb " )
return self . _get_preview ( fp )
io_image = io . BytesIO ( self . _zip_source . read ( self . _source_path [ 0 ] ) )
return self . _get_preview ( io_image )
def get_image_size ( self , i ) :
if self . _dimension == DimensionType . DIM_3D :
with self . _zip_source . open ( self . _source_path [ i ] , " r " ) as file :
properties = ValidateDimension . get_pcd_properties ( file )
return int ( properties [ " WIDTH " ] ) , int ( properties [ " HEIGHT " ] )
img = Image . open ( io . BytesIO ( self . _zip_source . read ( self . _source_path [ i ] ) ) )
return img . width , img . height
def get_image ( self , i ) :
return io . BytesIO ( self . _zip_source . read ( self . _source_path [ i ] ) )
def add_files ( self , source_path ) :
root_path = os . path . split ( self . _zip_source . filename ) [ 0 ]
for path in source_path :
self . _zip_source . write ( path , path . replace ( root_path , " " ) )
def get_zip_filename ( self ) :
return self . _zip_source . filename
def reconcile ( self , source_files , step = 1 , start = 0 , stop = None , dimension = DimensionType . DIM_2D ) :
self . _dimension = dimension
super ( ) . __init__ (
source_path = source_files ,
step = step ,
start = start ,
stop = stop
)
def get_path ( self , i ) :
if self . _zip_source . filename :
if self . _zip_source . filename :
return os . path . join ( os . path . dirname ( self . _zip_source . filename ) , self . _source_path [ i ] ) \
if not self . extract_dir else os . path . join ( self . extract_dir , self . _source_path [ i ] )
else : # necessary for mime_type definition
@ -283,8 +312,9 @@ class VideoReader(IMediaReader):
return image . width , image . height
class IChunkWriter ( ABC ) :
def __init__ ( self , quality ):
def __init__ ( self , quality , dimension = DimensionType . DIM_2D ):
self . _image_quality = quality
self . _dimension = dimension
@staticmethod
def _compress_image ( image_path , quality ) :
@ -326,12 +356,20 @@ class ZipCompressedChunkWriter(IChunkWriter):
def save_as_chunk ( self , images , chunk_path ) :
image_sizes = [ ]
with zipfile . ZipFile ( chunk_path , ' x ' ) as zip_chunk :
for idx , ( image , _ , _ ) in enumerate ( images ) :
w , h , image_buf = self . _compress_image ( image , self . _image_quality )
for idx , ( image , _ , _ ) in enumerate ( images ) :
if self . _dimension == DimensionType . DIM_2D :
w , h , image_buf = self . _compress_image ( image , self . _image_quality )
extension = " jpeg "
else :
image_buf = open ( image , " rb " ) if isinstance ( image , str ) else image
properties = ValidateDimension . get_pcd_properties ( image_buf )
w , h = int ( properties [ " WIDTH " ] ) , int ( properties [ " HEIGHT " ] )
extension = " pcd "
image_buf . seek ( 0 , 0 )
image_buf = io . BytesIO ( image_buf . read ( ) )
image_sizes . append ( ( w , h ) )
arcname = ' {:06d} .jpeg ' . format ( idx )
arcname = ' {:06d} . {} ' . format ( idx , extension )
zip_chunk . writestr ( arcname , image_buf . getvalue ( ) )
return image_sizes
class Mpeg4ChunkWriter ( IChunkWriter ) :
@ -511,3 +549,173 @@ MEDIA_TYPES = {
' unique ' : True ,
}
}
class ValidateDimension :
def __init__ ( self , path = None ) :
self . dimension = DimensionType . DIM_2D
self . path = path
self . related_files = { }
self . image_files = { }
self . converted_files = [ ]
@staticmethod
def get_pcd_properties ( fp , verify_version = False ) :
kv = { }
pcd_version = [ " 0.7 " , " 0.6 " , " 0.5 " , " 0.4 " , " 0.3 " , " 0.2 " , " 0.1 " ,
" .7 " , " .6 " , " .5 " , " .4 " , " .3 " , " .2 " , " .1 " ]
try :
for line in fp :
line = line . decode ( " utf-8 " )
if line . startswith ( " # " ) :
continue
k , v = line . split ( " " , maxsplit = 1 )
kv [ k ] = v . strip ( )
if " DATA " in line :
break
if verify_version :
if " VERSION " in kv and kv [ " VERSION " ] in pcd_version :
return True
return None
return kv
except AttributeError :
return None
@staticmethod
def convert_bin_to_pcd ( path , delete_source = True ) :
list_pcd = [ ]
with open ( path , " rb " ) as f :
size_float = 4
byte = f . read ( size_float * 4 )
while byte :
x , y , z , _ = struct . unpack ( " ffff " , byte )
list_pcd . append ( [ x , y , z ] )
byte = f . read ( size_float * 4 )
np_pcd = np . asarray ( list_pcd )
pcd = o3d . geometry . PointCloud ( )
pcd . points = o3d . utility . Vector3dVector ( np_pcd )
pcd_filename = path . replace ( " .bin " , " .pcd " )
o3d . io . write_point_cloud ( pcd_filename , pcd )
if delete_source :
os . remove ( path )
return pcd_filename
def set_path ( self , path ) :
self . path = path
def bin_operation ( self , file_path , actual_path ) :
pcd_path = ValidateDimension . convert_bin_to_pcd ( file_path )
self . converted_files . append ( pcd_path )
return pcd_path . split ( actual_path ) [ - 1 ] [ 1 : ]
@staticmethod
def pcd_operation ( file_path , actual_path ) :
with open ( file_path , " rb " ) as file :
is_pcd = ValidateDimension . get_pcd_properties ( file , verify_version = True )
return file_path . split ( actual_path ) [ - 1 ] [ 1 : ] if is_pcd else file_path
def process_files ( self , root , actual_path , files ) :
pcd_files = { }
for file in files :
file_name , file_extension = file . rsplit ( ' . ' , maxsplit = 1 )
file_path = os . path . abspath ( os . path . join ( root , file ) )
if file_extension == " bin " :
path = self . bin_operation ( file_path , actual_path )
pcd_files [ file_name ] = path
self . related_files [ path ] = [ ]
elif file_extension == " pcd " :
path = ValidateDimension . pcd_operation ( file_path , actual_path )
if path == file_path :
self . image_files [ file_name ] = file_path
else :
pcd_files [ file_name ] = path
self . related_files [ path ] = [ ]
else :
self . image_files [ file_name ] = file_path
return pcd_files
def validate_velodyne_points ( self , * args ) :
root , actual_path , files = args
velodyne_files = self . process_files ( root , actual_path , files )
related_path = os . path . split ( os . path . split ( root ) [ 0 ] ) [ 0 ]
path_list = [ re . search ( r ' image_ \ d.* ' , path , re . IGNORECASE ) for path in os . listdir ( related_path ) if
os . path . isdir ( os . path . join ( related_path , path ) ) ]
for path_ in path_list :
if path_ :
path = os . path . join ( path_ . group ( ) , " data " )
path = os . path . abspath ( os . path . join ( related_path , path ) )
files = [ file for file in os . listdir ( path ) if
os . path . isfile ( os . path . abspath ( os . path . join ( path , file ) ) ) ]
for file in files :
f_name = file . split ( " . " ) [ 0 ]
if velodyne_files . get ( f_name , None ) :
self . related_files [ velodyne_files [ f_name ] ] . append (
os . path . abspath ( os . path . join ( path , file ) ) )
def validate_pointcloud ( self , * args ) :
root , actual_path , files = args
pointcloud_files = self . process_files ( root , actual_path , files )
related_path = root . split ( " pointcloud " ) [ 0 ]
related_images_path = os . path . join ( related_path , " related_images " )
if os . path . isdir ( related_images_path ) :
paths = [ path for path in os . listdir ( related_images_path ) if
os . path . isdir ( os . path . abspath ( os . path . join ( related_images_path , path ) ) ) ]
for k in pointcloud_files :
for path in paths :
if k == path . split ( " _ " ) [ 0 ] :
file_path = os . path . abspath ( os . path . join ( related_images_path , path ) )
files = [ file for file in os . listdir ( file_path ) if
os . path . isfile ( os . path . join ( file_path , file ) ) ]
for related_image in files :
self . related_files [ pointcloud_files [ k ] ] . append ( os . path . join ( file_path , related_image ) )
def validate_default ( self , * args ) :
root , actual_path , files = args
pcd_files = self . process_files ( root , actual_path , files )
if len ( list ( pcd_files . keys ( ) ) ) :
for image in self . image_files . keys ( ) :
if pcd_files . get ( image , None ) :
self . related_files [ pcd_files [ image ] ] . append ( self . image_files [ image ] )
current_directory_name = os . path . split ( root )
if len ( pcd_files . keys ( ) ) == 1 :
pcd_name = list ( pcd_files . keys ( ) ) [ 0 ] . split ( " . " ) [ 0 ]
if current_directory_name [ 1 ] == pcd_name :
for related_image in self . image_files . values ( ) :
if root == os . path . split ( related_image ) [ 0 ] :
self . related_files [ pcd_files [ pcd_name ] ] . append ( related_image )
def validate ( self ) :
"""
Validate the directory structure for kitty and point cloud format .
"""
if not self . path :
return
actual_path = self . path
for root , _ , files in os . walk ( actual_path ) :
if root . endswith ( " data " ) :
if os . path . split ( os . path . split ( root ) [ 0 ] ) [ 1 ] == " velodyne_points " :
self . validate_velodyne_points ( root , actual_path , files )
elif os . path . split ( root ) [ - 1 ] == " pointcloud " :
self . validate_pointcloud ( root , actual_path , files )
else :
self . validate_default ( root , actual_path , files )
if len ( self . related_files . keys ( ) ) :
self . dimension = DimensionType . DIM_3D