From 5fa50be1fad0b42c3d86ff6fd96ef21bd3810a72 Mon Sep 17 00:00:00 2001 From: Maya Date: Mon, 10 Aug 2020 09:54:56 +0300 Subject: [PATCH] Added most of video processing & cache implementation --- cvat/apps/engine/media_extractors.py | 62 ++++++++++++- cvat/apps/engine/models.py | 3 + cvat/apps/engine/prepare.py | 128 +++++++++++++++++++++++---- cvat/apps/engine/task.py | 128 +++++++++++++++++---------- cvat/apps/engine/views.py | 60 ++++++++++++- cvat/requirements/base.txt | 1 + 6 files changed, 315 insertions(+), 67 deletions(-) diff --git a/cvat/apps/engine/media_extractors.py b/cvat/apps/engine/media_extractors.py index dea14183..c2bc020f 100644 --- a/cvat/apps/engine/media_extractors.py +++ b/cvat/apps/engine/media_extractors.py @@ -285,6 +285,19 @@ class ZipChunkWriter(IChunkWriter): # and does not decode it to know img size. return [] + def save_as_chunk_to_buff(self, images, format_='jpeg'): + buff = io.BytesIO() + + with zipfile.ZipFile(buff, 'w') as zip_file: + for idx, image in enumerate(images): + arcname = '{:06d}.{}'.format(idx, format_) + if isinstance(image, av.VideoFrame): + zip_file.writestr(arcname, image.to_image().tobytes().getvalue()) + else: + zip_file.write(filename=image, arcname=arcname) + buff.seek(0) + return buff + class ZipCompressedChunkWriter(IChunkWriter): def save_as_chunk(self, images, chunk_path): image_sizes = [] @@ -297,20 +310,30 @@ class ZipCompressedChunkWriter(IChunkWriter): return image_sizes + def save_as_chunk_to_buff(self, images, format_='jpeg'): + buff = io.BytesIO() + with zipfile.ZipFile(buff, 'w') as zip_file: + for idx, image in enumerate(images): + (_, _, image_buf) = self._compress_image(image, self._image_quality) + arcname = '{:06d}.{}'.format(idx, format_) + zip_file.writestr(arcname, image_buf.getvalue()) + buff.seek(0) + return buff + class Mpeg4ChunkWriter(IChunkWriter): def __init__(self, _): super().__init__(17) self._output_fps = 25 @staticmethod - def _create_av_container(path, w, h, rate, options): + def _create_av_container(path, w, h, rate, options, f=None): # x264 requires width and height must be divisible by 2 for yuv420p if h % 2: h += 1 if w % 2: w += 1 - container = av.open(path, 'w') + container = av.open(path, 'w',format=f) video_stream = container.add_stream('libx264', rate=rate) video_stream.pix_fmt = "yuv420p" video_stream.width = w @@ -341,6 +364,41 @@ class Mpeg4ChunkWriter(IChunkWriter): output_container.close() return [(input_w, input_h)] + def save_as_chunk_to_buff(self, frames, format_): + if not frames: + raise Exception('no images to save') + + buff = io.BytesIO() + input_w = frames[0].width + input_h = frames[0].height + + output_container, output_v_stream = self._create_av_container( + path=buff, + w=input_w, + h=input_h, + rate=self._output_fps, + options={ + "crf": str(self._image_quality), + "preset": "ultrafast", + }, + f=format_, + ) + + for frame in frames: + # let libav set the correct pts and time_base + frame.pts = None + frame.time_base = None + + for packet in output_v_stream.encode(frame): + output_container.mux(packet) + + # Flush streams + for packet in output_v_stream.encode(): + output_container.mux(packet) + output_container.close() + buff.seek(0) + return buff + @staticmethod def _encode_images(images, container, stream): for frame, _, _ in images: diff --git a/cvat/apps/engine/models.py b/cvat/apps/engine/models.py index d4c46eb3..d2f8e26e 100644 --- a/cvat/apps/engine/models.py +++ b/cvat/apps/engine/models.py @@ -102,6 +102,9 @@ class Data(models.Model): def get_preview_path(self): return os.path.join(self.get_data_dirname(), 'preview.jpeg') + def get_meta_path(self): + return os.path.join(self.get_upload_dirname(), 'meta_info.txt') + class Video(models.Model): data = models.OneToOneField(Data, on_delete=models.CASCADE, related_name="video", null=True) path = models.CharField(max_length=1024, default='') diff --git a/cvat/apps/engine/prepare.py b/cvat/apps/engine/prepare.py index 18beaa80..01d1c367 100644 --- a/cvat/apps/engine/prepare.py +++ b/cvat/apps/engine/prepare.py @@ -1,10 +1,11 @@ import av +import hashlib -class PrepareInfo: - - def __init__(self, source_path, meta_path): - self.source_path = source_path - self.meta_path = meta_path +class WorkWithVideo: + def __init__(self, **kwargs): + if not kwargs.get('source_path'): + raise Exeption('No sourse path') + self.source_path = kwargs.get('source_path') def _open_video_container(self, sourse_path, mode, options=None): return av.open(sourse_path, mode=mode, options=options) @@ -17,22 +18,114 @@ class PrepareInfo: video_stream.thread_type = 'AUTO' return video_stream - #@get_execution_time - def save_meta_info(self): + +class AnalyzeVideo(WorkWithVideo): + def check_type_first_frame(self): + container = self._open_video_container(self.source_path, mode='r') + video_stream = self._get_video_stream(container) + + for packet in container.demux(video_stream): + for frame in packet.decode(): + self._close_video_container(container) + assert frame.pict_type.name == 'I', 'First frame is not key frame' + return + + def check_video_timestamps_sequences(self): container = self._open_video_container(self.source_path, mode='r') video_stream = self._get_video_stream(container) - with open(self.meta_path, 'w') as file: - frame_number = 0 + frame_pts = -1 + frame_dts = -1 + for packet in container.demux(video_stream): + for frame in packet.decode(): + + if None not in [frame.pts, frame_pts] and frame.pts <= frame_pts: + self._close_video_container(container) + raise Exception('Invalid pts sequences') + + if None not in [frame.dts, frame_dts] and frame.dts <= frame_dts: + self._close_video_container(container) + raise Exception('Invalid dts sequences') + + frame_pts, frame_dts = frame.pts, frame.dts + self._close_video_container(container) + +# class Frame: +# def __init__(self, frame, frame_number=None): +# self.frame = frame +# if frame_number: +# self.frame_number = frame_number + +# def md5_hash(self): +# return hashlib.md5(self.frame.to_image().tobytes()).hexdigest() + +# def __eq__(self, image): +# return self.md5_hash(self) == image.md5_hash(image) and self.frame.pts == image.frame.pts + +# def __ne__(self, image): +# return md5_hash(self) != md5_hash(image) or self.frame.pts != image.frame.pts + +# def __len__(self): +# return (self.frame.width, self.frame.height) + - for packet in container.demux(video_stream): +def md5_hash(frame): + return hashlib.md5(frame.to_image().tobytes()).hexdigest() + +class PrepareInfo(WorkWithVideo): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + if not kwargs.get('meta_path'): + raise Exception('No meta path') + + self.meta_path = kwargs.get('meta_path') + self.key_frames = {} + self.frames = 0 + + def get_task_size(self): + return self.frames + + def check_seek_key_frames(self): + container = self._open_video_container(self.source_path, mode='r') + video_stream = self._get_video_stream(container) + + key_frames_copy = self.key_frames.copy() + + for _, key_frame in key_frames_copy.items(): + container.seek(offset=key_frame.pts, stream=video_stream) + flag = True + for packet in container.demux(video_stream): for frame in packet.decode(): - frame_number += 1 - if frame.key_frame: - file.write('{} {}\n'.format(frame_number, frame.pts)) + if md5_hash(frame) != md5_hash(key_frame) or frame.pts != key_frame.pts: + self.key_frames.pop(index) + flag = False + break + if not flag: + break + + if len(self.key_frames) == 0: #or self.frames // len(self.key_frames) > 300: + raise Exception('Too few keyframes') + + def save_key_frames(self): + container = self._open_video_container(self.source_path, mode='r') + video_stream = self._get_video_stream(container) + frame_number = 0 + for packet in container.demux(video_stream): + for frame in packet.decode(): + if frame.key_frame: + self.key_frames[frame_number] = frame + frame_number += 1 + + self.frames = frame_number self._close_video_container(container) - return frame_number# == task_size + + def save_meta_info(self): + with open(self.meta_path, 'w') as meta_file: + for index, frame in self.key_frames.items(): + meta_file.write('{} {}\n'.format(index, frame.pts)) def get_nearest_left_key_frame(self, start_chunk_frame_number): start_decode_frame_number = 0 @@ -52,10 +145,9 @@ class PrepareInfo: return int(start_decode_frame_number), int(start_decode_timestamp) def decode_needed_frames(self, chunk_number, chunk_size): - start_chunk_frame_number = (chunk_number - 1) * chunk_size + 1 - end_chunk_frame_number = start_chunk_frame_number + chunk_size #- 1 + start_chunk_frame_number = chunk_number * chunk_size + end_chunk_frame_number = start_chunk_frame_number + chunk_size start_decode_frame_number, start_decode_timestamp = self.get_nearest_left_key_frame(start_chunk_frame_number) - extra_frames = start_chunk_frame_number - start_decode_frame_number container = self._open_video_container(self.source_path, mode='r') video_stream = self._get_video_stream(container) @@ -73,4 +165,4 @@ class PrepareInfo: self._close_video_container(container) return - self._close_video_container(container) + self._close_video_container(container) \ No newline at end of file diff --git a/cvat/apps/engine/task.py b/cvat/apps/engine/task.py index 804ab1b4..b468d4ef 100644 --- a/cvat/apps/engine/task.py +++ b/cvat/apps/engine/task.py @@ -24,7 +24,7 @@ from distutils.dir_util import copy_tree from . import models from .log import slogger -from .prepare import PrepareInfo +from .prepare import PrepareInfo, AnalyzeVideo from diskcache import Cache ############################# Low Level server API @@ -232,22 +232,6 @@ def _create_thread(tid, data): job.meta['status'] = 'Media files are being extracted...' job.save_meta() - if settings.USE_CACHE: - for media_type, media_files in media.items(): - if media_files: - if task_mode == MEDIA_TYPES['video']['mode']: - meta_info = PrepareInfo(source_path=os.path.join(upload_dir, media_files[0]), - meta_path=os.path.join(upload_dir, 'meta_info.txt')) - meta_info.save_meta_info() - # else: - # with Cache(settings.CACHE_ROOT) as cache: - # counter_ = itertools.count(start=1) - - #TODO: chunk size - # for chunk_number, media_paths in itertools.groupby(media_files, lambda x: next(counter_) // db_data.chunk_size): - # cache.set('{}_{}'.format(tid, chunk_number), media_paths, tag='dummy') - #else: - db_images = [] extractor = None @@ -294,37 +278,91 @@ def _create_thread(tid, data): else: db_data.chunk_size = 36 + #it's better to add the field to the Task model + video_suitable_on_the_fly_processing = True + video_path = "" video_size = (0, 0) - counter = itertools.count() - generator = itertools.groupby(extractor, lambda x: next(counter) // db_data.chunk_size) - for chunk_idx, chunk_data in generator: - chunk_data = list(chunk_data) - original_chunk_path = db_data.get_original_chunk_path(chunk_idx) - original_chunk_writer.save_as_chunk(chunk_data, original_chunk_path) - - compressed_chunk_path = db_data.get_compressed_chunk_path(chunk_idx) - img_sizes = compressed_chunk_writer.save_as_chunk(chunk_data, compressed_chunk_path) - - if db_task.mode == 'annotation': - db_images.extend([ - models.Image( - data=db_data, - path=os.path.relpath(data[1], upload_dir), - frame=data[2], - width=size[0], - height=size[1]) - - for data, size in zip(chunk_data, img_sizes) - ]) - else: - video_size = img_sizes[0] - video_path = chunk_data[0][1] + if settings.USE_CACHE: + for media_type, media_files in media.items(): + if media_files: + if task_mode == MEDIA_TYPES['video']['mode']: + try: + analizer = AnalyzeVideo(source_path=os.path.join(upload_dir, media_files[0])) + analizer.check_type_first_frame() + analizer.check_video_timestamps_sequences() + + meta_info = PrepareInfo(source_path=os.path.join(upload_dir, media_files[0]), + meta_path=os.path.join(upload_dir, 'meta_info.txt')) + meta_info.save_key_frames() + #meta_info.test_seek() + meta_info.check_seek_key_frames() + meta_info.save_meta_info() + + db_data.size = meta_info.get_task_size() + video_path = os.path.join(upload_dir, media_files[0]) + frame = meta_info.key_frames.get(next(iter(meta_info.key_frames))) + video_size = (frame.width, frame.height) + + except AssertionError as ex: + video_suitable_on_the_fly_processing = False + except Exception as ex: + video_suitable_on_the_fly_processing = False + + else:#images, TODO:archive + with Cache(settings.CACHE_ROOT) as cache: + counter_ = itertools.count() + + for chunk_number, media_paths in itertools.groupby(media_files, lambda x: next(counter_) // db_data.chunk_size): + media_paths = list(media_paths) + cache.set('{}_{}'.format(tid, chunk_number), [os.path.join(upload_dir, file_name) for file_name in media_paths], tag='dummy') + + img_sizes = [] + from PIL import Image + for media_path in media_paths: + img_sizes += [Image.open(os.path.join(upload_dir, media_path)).size] + db_data.size += len(media_paths) + db_images.extend([ + models.Image( + data=db_data, + path=data[1], + frame=data[0], + width=size[0], + height=size[1]) + for data, size in zip(enumerate(media_paths), img_sizes) + ]) + + + if db_task.mode == 'interpolation' and not video_suitable_on_the_fly_processing or not settings.USE_CACHE: + counter = itertools.count() + generator = itertools.groupby(extractor, lambda x: next(counter) // db_data.chunk_size) + for chunk_idx, chunk_data in generator: + chunk_data = list(chunk_data) + original_chunk_path = db_data.get_original_chunk_path(chunk_idx) + original_chunk_writer.save_as_chunk(chunk_data, original_chunk_path) + + compressed_chunk_path = db_data.get_compressed_chunk_path(chunk_idx) + img_sizes = compressed_chunk_writer.save_as_chunk(chunk_data, compressed_chunk_path) + + if db_task.mode == 'annotation': + db_images.extend([ + models.Image( + data=db_data, + path=os.path.relpath(data[1], upload_dir), + frame=data[2], + width=size[0], + height=size[1]) + + for data, size in zip(chunk_data, img_sizes) + ]) + else: + video_size = img_sizes[0] + video_path = chunk_data[0][1] - db_data.size += len(chunk_data) - progress = extractor.get_progress(chunk_data[-1][2]) - update_progress(progress) + db_data.size += len(chunk_data) + progress = extractor.get_progress(chunk_data[-1][2]) + update_progress(progress) if db_task.mode == 'annotation': models.Image.objects.bulk_create(db_images) @@ -342,4 +380,4 @@ def _create_thread(tid, data): preview.save(db_data.get_preview_path()) slogger.glob.info("Founded frames {} for Data #{}".format(db_data.size, db_data.id)) - _save_task_to_db(db_task) + _save_task_to_db(db_task) \ No newline at end of file diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index 120125ea..621f85e8 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -29,7 +29,7 @@ from rest_framework.exceptions import APIException from rest_framework.permissions import SAFE_METHODS, IsAuthenticated from rest_framework.renderers import JSONRenderer from rest_framework.response import Response -from sendfile import sendfile +from sendfile import sendfile#убрать import cvat.apps.dataset_manager as dm import cvat.apps.dataset_manager.views # pylint: disable=unused-import @@ -37,7 +37,7 @@ from cvat.apps.authentication import auth from cvat.apps.authentication.decorators import login_required from cvat.apps.dataset_manager.serializers import DatasetFormatsSerializer from cvat.apps.engine.frame_provider import FrameProvider -from cvat.apps.engine.models import Job, Plugin, StatusChoice, Task +from cvat.apps.engine.models import Job, Plugin, StatusChoice, Task, DataChoice from cvat.apps.engine.serializers import ( AboutSerializer, AnnotationFileSerializer, BasicUserSerializer, DataMetaSerializer, DataSerializer, ExceptionSerializer, @@ -49,6 +49,12 @@ from cvat.apps.engine.utils import av_scan_paths from . import models, task from .log import clogger, slogger +from .media_extractors import ( + Mpeg4ChunkWriter, Mpeg4CompressedChunkWriter, + ZipCompressedChunkWriter, ZipChunkWriter) +from .prepare import PrepareInfo +from diskcache import Cache +#from cvat.apps.engine.mime_types import mimetypes # drf-yasg component doesn't handle correctly URL_FORMAT_OVERRIDE and @@ -437,13 +443,63 @@ class TaskViewSet(auth.TaskGetQuerySetMixin, viewsets.ModelViewSet): try: db_task = self.get_object() + db_data = db_task.data frame_provider = FrameProvider(db_task.data) if data_type == 'chunk': data_id = int(data_id) + quality = data_quality + data_quality = FrameProvider.Quality.COMPRESSED \ if data_quality == 'compressed' else FrameProvider.Quality.ORIGINAL + path = os.path.realpath(frame_provider.get_chunk(data_id, data_quality)) + #TODO: av.FFmpegError processing + if settings.USE_CACHE: + with Cache(settings.CACHE_ROOT) as cache: + buff = None + chunk, tag = cache.get('{}_{}_{}'.format(db_task.id, data_id, quality), tag=True) + + if not chunk: + extractor_classes = { + 'compressed' : Mpeg4CompressedChunkWriter if db_data.compressed_chunk_type == DataChoice.VIDEO else ZipCompressedChunkWriter, + 'original' : Mpeg4ChunkWriter if db_data.original_chunk_type == DataChoice.VIDEO else ZipChunkWriter, + } + + image_quality = 100 if extractor_classes[quality] in [Mpeg4ChunkWriter, ZipChunkWriter] else db_data.image_quality + file_extension = 'mp4' if extractor_classes[quality] in [Mpeg4ChunkWriter, Mpeg4CompressedChunkWriter] else 'jpeg' + mime_type = 'video/mp4' if extractor_classes[quality] in [Mpeg4ChunkWriter, Mpeg4CompressedChunkWriter] else 'application/zip' + + extractor = extractor_classes[quality](image_quality) + + if 'interpolation' == db_task.mode: + + meta = PrepareInfo(source_path=os.path.join(db_data.get_upload_dirname(), db_data.video.path), + meta_path=db_data.get_meta_path()) + + frames = [] + for frame in meta.decode_needed_frames(data_id,db_data.chunk_size): + frames.append(frame) + + + buff = extractor.save_as_chunk_to_buff(frames, + format_=file_extension) + cache.set('{}_{}_{}'.format(db_task.id, data_id, quality), buff, tag=mime_type) + + else: + img_paths = cache.get('{}_{}'.format(db_task.id, data_id)) + buff = extractor.save_as_chunk_to_buff(img_paths, + format_=file_extension) + cache.set('{}_{}_{}'.format(db_task.id, data_id, quality), buff, tag=mime_type) + + + + elif 'process_creating' == tag: + pass + else: + buff, mime_type = cache.get('{}_{}_{}'.format(db_task.id, data_id, quality), tag=True) + + return HttpResponse(buff.getvalue(), content_type=mime_type) # Follow symbol links if the chunk is a link on a real image otherwise # mimetype detection inside sendfile will work incorrectly. diff --git a/cvat/requirements/base.txt b/cvat/requirements/base.txt index b2847cbd..cc650f3f 100644 --- a/cvat/requirements/base.txt +++ b/cvat/requirements/base.txt @@ -49,3 +49,4 @@ av==6.2.0 # The package is used by pyunpack as a command line tool to support multiple # archives. Don't use as a python module because it has GPL license. patool==1.12 +diskcache==4.1.0 \ No newline at end of file