Small preview and progress (#1331)

* Reduce preview size (untested)

* Fix tests

* Improve media readers (untested)

* fixed migration

* fixed frame provider

* fixed preview save

* fixed stop frame

* handle duration == None

* codacy

* added missed import

* unified iteration over frames for media readers and fixed corner case when user specify stop_frame = 0

Co-authored-by: Nikita Manovich <nikita.manovich@intel.com>
main
Andrey Zhavoronkov 6 years ago committed by GitHub
parent ddf452c557
commit e87ec38476
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -5,6 +5,7 @@
import math
from io import BytesIO
from enum import Enum
import itertools
import numpy as np
from PIL import Image
@ -80,7 +81,7 @@ class FrameProvider():
extracted_chunk = chunk_number
chunk_reader = reader_class([chunk_path])
frame, frame_name = chunk_reader[frame_offset]
frame, frame_name, _ = next(itertools.islice(chunk_reader, frame_offset, None))
if reader_class is VideoReader:
return (self._av_frame_to_png_bytes(frame), 'image/png')

@ -6,8 +6,7 @@ import os
import tempfile
import shutil
import zipfile
from io import BytesIO
import itertools
import io
from abc import ABC, abstractmethod
import av
@ -25,6 +24,13 @@ def get_mime(name):
return 'unknown'
def create_tmp_dir():
return tempfile.mkdtemp(prefix='cvat-', suffix='.data')
def delete_tmp_dir(tmp_dir):
if tmp_dir:
shutil.rmtree(tmp_dir)
class IMediaReader(ABC):
def __init__(self, source_path, step, start, stop):
self._source_path = sorted(source_path)
@ -32,78 +38,74 @@ class IMediaReader(ABC):
self._start = start
self._stop = stop
@staticmethod
def create_tmp_dir():
return tempfile.mkdtemp(prefix='cvat-', suffix='.data')
@staticmethod
def delete_tmp_dir(tmp_dir):
if tmp_dir:
shutil.rmtree(tmp_dir)
@abstractmethod
def __iter__(self):
pass
@abstractmethod
def __getitem__(self, k):
def get_preview(self):
pass
@abstractmethod
def save_preview(self, preview_path):
def get_progress(self, pos):
pass
def slice_by_size(self, size):
# stopFrame should be included
it = itertools.islice(self, self._start, self._stop + 1 if self._stop else None)
frames = list(itertools.islice(it, 0, size * self._step, self._step))
while frames:
yield frames
frames = list(itertools.islice(it, 0, size * self._step, self._step))
@property
@abstractmethod
def image_names(self):
pass
@staticmethod
def _get_preview(obj):
if isinstance(obj, io.IOBase):
preview = Image.open(obj)
else:
preview = obj
preview.thumbnail((128, 128))
return preview.convert('RGB')
@abstractmethod
def get_image_size(self):
pass
#Note step, start, stop have no affect
class ImageListReader(IMediaReader):
def __init__(self, source_path, step=1, start=0, stop=0):
def __init__(self, source_path, step=1, start=0, stop=None):
if not source_path:
raise Exception('No image found')
if stop is None:
stop = len(source_path)
else:
stop = min(len(source_path), stop + 1)
step = max(step, 1)
assert stop > start
super().__init__(
source_path=source_path,
step=1,
start=0,
stop=0,
step=step,
start=start,
stop=stop,
)
def __iter__(self):
return zip(self._source_path, self.image_names)
for i in range(self._start, self._stop, self._step):
yield (self.get_image(i), self.get_path(i), i)
def __getitem__(self, k):
return (self._source_path[k], self.image_names[k])
def get_path(self, i):
return self._source_path[i]
def __len__(self):
return len(self._source_path)
def get_image(self, i):
return self._source_path[i]
def save_preview(self, preview_path):
shutil.copyfile(self._source_path[0], preview_path)
def get_progress(self, pos):
return (pos - self._start + 1) / (self._stop - self._start)
@property
def image_names(self):
return self._source_path
def get_preview(self):
fp = open(self._source_path[0], "rb")
return self._get_preview(fp)
def get_image_size(self):
img = Image.open(self._source_path[0])
return img.width, img.height
#Note step, start, stop have no affect
class DirectoryReader(ImageListReader):
def __init__(self, source_path, step=1, start=0, stop=0):
def __init__(self, source_path, step=1, start=0, stop=None):
image_paths = []
for source in source_path:
for root, _, files in os.walk(source):
@ -112,41 +114,38 @@ class DirectoryReader(ImageListReader):
image_paths.extend(paths)
super().__init__(
source_path=image_paths,
step=1,
start=0,
stop=0,
step=step,
start=start,
stop=stop,
)
#Note step, start, stop have no affect
class ArchiveReader(DirectoryReader):
def __init__(self, source_path, step=1, start=0, stop=0):
self._tmp_dir = self.create_tmp_dir()
def __init__(self, source_path, step=1, start=0, stop=None):
self._tmp_dir = create_tmp_dir()
self._archive_source = source_path[0]
Archive(self._archive_source).extractall(self._tmp_dir)
super().__init__(
source_path=[self._tmp_dir],
step=1,
start=0,
stop=0,
step=step,
start=start,
stop=stop,
)
def __del__(self):
if (self._tmp_dir):
self.delete_tmp_dir(self._tmp_dir)
delete_tmp_dir(self._tmp_dir)
@property
def image_names(self):
return [os.path.join(os.path.dirname(self._archive_source), os.path.relpath(p, self._tmp_dir)) for p in super().image_names]
def get_path(self, i):
base_dir = os.path.dirname(self._archive_source)
return os.path.join(base_dir, os.path.relpath(self._source_path[i], self._tmp_dir))
#Note step, start, stop have no affect
class PdfReader(DirectoryReader):
def __init__(self, source_path, step=1, start=0, stop=0):
def __init__(self, source_path, step=1, start=0, stop=None):
if not source_path:
raise Exception('No PDF found')
from pdf2image import convert_from_path
self._pdf_source = source_path[0]
self._tmp_dir = self.create_tmp_dir()
self._tmp_dir = create_tmp_dir()
file_ = convert_from_path(self._pdf_source)
basename = os.path.splitext(os.path.basename(self._pdf_source))[0]
for page_num, page in enumerate(file_):
@ -155,96 +154,88 @@ class PdfReader(DirectoryReader):
super().__init__(
source_path=[self._tmp_dir],
step=1,
start=0,
stop=0,
step=step,
start=start,
stop=stop,
)
def __del__(self):
if (self._tmp_dir):
self.delete_tmp_dir(self._tmp_dir)
delete_tmp_dir(self._tmp_dir)
@property
def image_names(self):
return [os.path.join(os.path.dirname(self._pdf_source), os.path.relpath(p, self._tmp_dir)) for p in super().image_names]
def get_path(self, i):
base_dir = os.path.dirname(self._pdf_source)
return os.path.join(base_dir, os.path.relpath(self._source_path[i], self._tmp_dir))
class ZipReader(IMediaReader):
def __init__(self, source_path, step=1, start=0, stop=0):
class ZipReader(ImageListReader):
def __init__(self, source_path, step=1, start=0, stop=None):
self._zip_source = zipfile.ZipFile(source_path[0], mode='r')
file_list = [f for f in self._zip_source.namelist() if get_mime(f) == 'image']
super().__init__(file_list, step, start, stop)
def __iter__(self):
for f in zip(self._source_path, self.image_names):
yield (BytesIO(self._zip_source.read(f[0])), f[1])
def __len__(self):
return len(self._source_path)
def __getitem__(self, k):
return (BytesIO(self._zip_source.read(self._source_path[k])), self.image_names[k])
def __del__(self):
self._zip_source.close()
def save_preview(self, preview_path):
with open(preview_path, 'wb') as f:
f.write(self._zip_source.read(self._source_path[0]))
def get_preview(self):
io_image = io.BytesIO(self._zip_source.read(self._source_path[0]))
return self._get_preview(io_image)
def get_image_size(self):
img = Image.open(BytesIO(self._zip_source.read(self._source_path[0])))
img = Image.open(io.BytesIO(self._zip_source.read(self._source_path[0])))
return img.width, img.height
@property
def image_names(self):
return [os.path.join(os.path.dirname(self._zip_source.filename), p) for p in self._source_path]
def get_image(self, i):
return io.BytesIO(self._zip_source.read(self._source_path[i]))
class VideoReader(IMediaReader):
def __init__(self, source_path, step=1, start=0, stop=0):
self._output_fps = 25
def get_path(self, i):
return os.path.join(os.path.dirname(self._zip_source.filename), self._source_path[i])
class VideoReader(IMediaReader):
def __init__(self, source_path, step=1, start=0, stop=None):
super().__init__(
source_path=source_path,
step=step,
start=start,
stop=stop,
stop=stop + 1 if stop is not None else stop,
)
def __iter__(self):
def decode_frames(container):
for packet in container.demux():
if packet.stream.type == 'video':
for frame in packet.decode():
yield frame
def _has_frame(self, i):
if i >= self._start:
if (i - self._start) % self._step == 0:
if self._stop is None or i < self._stop:
return True
return False
def _decode(self, container):
frame_num = 0
for packet in container.demux():
if packet.stream.type == 'video':
for image in packet.decode():
frame_num += 1
if self._has_frame(frame_num - 1):
yield (image, self._source_path[0], image.pts)
def __iter__(self):
container = self._get_av_container()
source_video_stream = container.streams.video[0]
source_video_stream.thread_type = 'AUTO'
image_names = self.image_names
return itertools.zip_longest(decode_frames(container), image_names, fillvalue=image_names[0])
return self._decode(container)
def __len__(self):
def get_progress(self, pos):
container = self._get_av_container()
# Not for all containers return real value
length = container.streams.video[0].frames
return length
def __getitem__(self, k):
return next(itertools.islice(self, k, k + 1))
stream = container.streams.video[0]
return pos / stream.duration if stream.duration else None
def _get_av_container(self):
return av.open(av.datasets.curated(self._source_path[0]))
def save_preview(self, preview_path):
def get_preview(self):
container = self._get_av_container()
stream = container.streams.video[0]
preview = next(container.decode(stream))
preview.to_image().save(preview_path)
@property
def image_names(self):
return self._source_path
return self._get_preview(preview.to_image())
def get_image_size(self):
image = (next(iter(self)))[0]
@ -266,7 +257,7 @@ class IChunkWriter(ABC):
image = Image.fromarray(im_data.astype(np.int32))
converted_image = image.convert('RGB')
image.close()
buf = BytesIO()
buf = io.BytesIO()
converted_image.save(buf, format='JPEG', quality=quality, optimize=True)
buf.seek(0)
width, height = converted_image.size
@ -280,9 +271,9 @@ class IChunkWriter(ABC):
class ZipChunkWriter(IChunkWriter):
def save_as_chunk(self, images, chunk_path):
with zipfile.ZipFile(chunk_path, 'x') as zip_chunk:
for idx, (image, image_name) in enumerate(images):
arcname = '{:06d}{}'.format(idx, os.path.splitext(image_name)[1])
if isinstance(image, BytesIO):
for idx, (image, path, _) in enumerate(images):
arcname = '{:06d}{}'.format(idx, os.path.splitext(path)[1])
if isinstance(image, io.BytesIO):
zip_chunk.writestr(arcname, image.getvalue())
else:
zip_chunk.write(filename=image, arcname=arcname)
@ -294,7 +285,7 @@ class ZipCompressedChunkWriter(IChunkWriter):
def save_as_chunk(self, images, chunk_path):
image_sizes = []
with zipfile.ZipFile(chunk_path, 'x') as zip_chunk:
for idx, (image, _) in enumerate(images):
for idx, (image, _ , _) in enumerate(images):
w, h, image_buf = self._compress_image(image, self._image_quality)
image_sizes.append((w, h))
arcname = '{:06d}.jpeg'.format(idx)
@ -344,7 +335,7 @@ class Mpeg4ChunkWriter(IChunkWriter):
@staticmethod
def _encode_images(images, container, stream):
for frame, _ in images:
for frame, _, _ in images:
# let libav set the correct pts and time_base
frame.pts = None
frame.time_base = None

@ -68,14 +68,18 @@ def migrate_task_data(db_task_id, db_data_id, original_video, original_images, s
original_chunk_writer = Mpeg4ChunkWriter(100)
compressed_chunk_writer = ZipCompressedChunkWriter(image_quality)
for chunk_idx, chunk_images in enumerate(reader.slice_by_size(chunk_size)):
counter = itertools.count()
generator = itertools.groupby(reader, lambda x: next(counter) // chunk_size)
for chunk_idx, chunk_images in generator:
chunk_images = list(chunk_images)
original_chunk_path = os.path.join(original_cache_dir, '{}.mp4'.format(chunk_idx))
original_chunk_writer.save_as_chunk(chunk_images, original_chunk_path)
compressed_chunk_path = os.path.join(compressed_cache_dir, '{}.zip'.format(chunk_idx))
compressed_chunk_writer.save_as_chunk(chunk_images, compressed_chunk_path)
reader.save_preview(os.path.join(db_data_dir, 'preview.jpeg'))
preview = reader.get_preview()
preview.save(os.path.join(db_data_dir, 'preview.jpeg'))
else:
original_chunk_writer = ZipChunkWriter(100)
for chunk_idx, chunk_image_ids in enumerate(slice_by_size(range(size), chunk_size)):
@ -131,14 +135,18 @@ def migrate_task_data(db_task_id, db_data_id, original_video, original_images, s
original_chunk_writer = ZipChunkWriter(100)
compressed_chunk_writer = ZipCompressedChunkWriter(image_quality)
for chunk_idx, chunk_images in enumerate(reader.slice_by_size(chunk_size)):
counter = itertools.count()
generator = itertools.groupby(reader, lambda x: next(counter) // chunk_size)
for chunk_idx, chunk_images in generator:
chunk_images = list(chunk_images)
compressed_chunk_path = os.path.join(compressed_cache_dir, '{}.zip'.format(chunk_idx))
compressed_chunk_writer.save_as_chunk(chunk_images, compressed_chunk_path)
original_chunk_path = os.path.join(original_cache_dir, '{}.zip'.format(chunk_idx))
original_chunk_writer.save_as_chunk(chunk_images, original_chunk_path)
reader.save_preview(os.path.join(db_data_dir, 'preview.jpeg'))
preview = reader.get_preview()
preview.save(os.path.join(db_data_dir, 'preview.jpeg'))
shutil.rmtree(old_db_task_dir)
return_dict[db_task_id] = (True, '')
except Exception as e:

@ -3,6 +3,7 @@
#
# SPDX-License-Identifier: MIT
import itertools
import os
import sys
import rq
@ -237,15 +238,25 @@ def _create_thread(tid, data):
source_path=[os.path.join(upload_dir, f) for f in media_files],
step=db_data.get_frame_step(),
start=db_data.start_frame,
stop=db_data.stop_frame,
stop=data['stop_frame'],
)
db_task.mode = task_mode
db_data.compressed_chunk_type = models.DataChoice.VIDEO if task_mode == 'interpolation' and not data['use_zip_chunks'] else models.DataChoice.IMAGESET
db_data.original_chunk_type = models.DataChoice.VIDEO if task_mode == 'interpolation' else models.DataChoice.IMAGESET
def update_progress(progress):
job.meta['status'] = 'Images are being compressed... {}%'.format(round(progress * 100))
progress_animation = '|/-\\'
if not hasattr(update_progress, 'call_counter'):
update_progress.call_counter = 0
status_template = 'Images are being compressed {}'
if progress:
current_progress = '{}%'.format(round(progress * 100))
else:
current_progress = '{}'.format(progress_animation[update_progress.call_counter])
job.meta['status'] = status_template.format(current_progress)
job.save_meta()
update_progress.call_counter = (update_progress.call_counter + 1) % len(progress_animation)
compressed_chunk_writer_class = Mpeg4CompressedChunkWriter if db_data.compressed_chunk_type == DataChoice.VIDEO else ZipCompressedChunkWriter
original_chunk_writer_class = Mpeg4ChunkWriter if db_data.original_chunk_type == DataChoice.VIDEO else ZipChunkWriter
@ -262,45 +273,52 @@ def _create_thread(tid, data):
else:
db_data.chunk_size = 36
frame_counter = 0
total_len = len(extractor) or 100
image_names = []
image_sizes = []
for chunk_idx, chunk_images in enumerate(extractor.slice_by_size(db_data.chunk_size)):
for img in chunk_images:
image_names.append(img[1])
video_path = ""
video_size = (0, 0)
counter = itertools.count()
generator = itertools.groupby(extractor, lambda x: next(counter) // db_data.chunk_size)
for chunk_idx, chunk_data in generator:
chunk_data = list(chunk_data)
original_chunk_path = db_data.get_original_chunk_path(chunk_idx)
original_chunk_writer.save_as_chunk(chunk_images, original_chunk_path)
original_chunk_writer.save_as_chunk(chunk_data, original_chunk_path)
compressed_chunk_path = db_data.get_compressed_chunk_path(chunk_idx)
img_sizes = compressed_chunk_writer.save_as_chunk(chunk_images, compressed_chunk_path)
image_sizes.extend(img_sizes)
img_sizes = compressed_chunk_writer.save_as_chunk(chunk_data, compressed_chunk_path)
if db_task.mode == 'annotation':
db_images.extend([
models.Image(
data=db_data,
path=os.path.relpath(data[1], upload_dir),
frame=data[2],
width=size[0],
height=size[1])
for data, size in zip(chunk_data, img_sizes)
])
else:
video_size = img_sizes[0]
video_path = chunk_data[0][1]
db_data.size += len(chunk_images)
update_progress(db_data.size / total_len)
db_data.size += len(chunk_data)
progress = extractor.get_progress(chunk_data[-1][2])
update_progress(progress)
if db_task.mode == 'annotation':
for image_name, image_size in zip(image_names, image_sizes):
db_images.append(models.Image(
data=db_data,
path=os.path.relpath(image_name, upload_dir),
frame=frame_counter,
width=image_size[0],
height=image_size[1],
))
frame_counter += 1
models.Image.objects.bulk_create(db_images)
db_images = []
else:
models.Video.objects.create(
data=db_data,
path=os.path.relpath(image_names[0], upload_dir),
width=image_sizes[0][0], height=image_sizes[0][1])
if db_data.stop_frame == 0:
db_data.stop_frame = db_data.start_frame + (db_data.size - 1) * db_data.get_frame_step()
path=os.path.relpath(video_path, upload_dir),
width=video_size[0], height=video_size[1])
if db_data.stop_frame == 0:
db_data.stop_frame = db_data.start_frame + (db_data.size - 1) * db_data.get_frame_step()
extractor.save_preview(db_data.get_preview_path())
preview = extractor.get_preview()
preview.save(db_data.get_preview_path())
slogger.glob.info("Founded frames {} for Data #{}".format(db_data.size, db_data.id))
_save_task_to_db(db_task)

@ -1641,7 +1641,7 @@ class TaskDataAPITestCase(APITestCase):
self.assertEqual(response.status_code, expected_status_code)
if expected_status_code == status.HTTP_200_OK:
preview = Image.open(io.BytesIO(b"".join(response.streaming_content)))
self.assertEqual(preview.size, image_sizes[0])
self.assertLessEqual(preview.size, image_sizes[0])
# check compressed chunk
response = self._get_compressed_chunk(task_id, user, 0)

@ -415,6 +415,10 @@ class TaskViewSet(auth.TaskGetQuerySetMixin, viewsets.ModelViewSet):
db_task.save()
data = {k:v for k, v in serializer.data.items()}
data['use_zip_chunks'] = serializer.validated_data['use_zip_chunks']
# if the value of stop_frame is 0, then inside the function we cannot know
# the value specified by the user or it's default value from the database
if 'stop_frame' not in serializer.validated_data:
data['stop_frame'] = None
task.create(db_task.id, data)
return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
else:

Loading…
Cancel
Save