Added most of video processing & cache implementation

main
Maya 6 years ago
parent 0f7f14a1ef
commit 5fa50be1fa

@ -285,6 +285,19 @@ class ZipChunkWriter(IChunkWriter):
# and does not decode it to know img size.
return []
def save_as_chunk_to_buff(self, images, format_='jpeg'):
buff = io.BytesIO()
with zipfile.ZipFile(buff, 'w') as zip_file:
for idx, image in enumerate(images):
arcname = '{:06d}.{}'.format(idx, format_)
if isinstance(image, av.VideoFrame):
zip_file.writestr(arcname, image.to_image().tobytes().getvalue())
else:
zip_file.write(filename=image, arcname=arcname)
buff.seek(0)
return buff
class ZipCompressedChunkWriter(IChunkWriter):
def save_as_chunk(self, images, chunk_path):
image_sizes = []
@ -297,20 +310,30 @@ class ZipCompressedChunkWriter(IChunkWriter):
return image_sizes
def save_as_chunk_to_buff(self, images, format_='jpeg'):
buff = io.BytesIO()
with zipfile.ZipFile(buff, 'w') as zip_file:
for idx, image in enumerate(images):
(_, _, image_buf) = self._compress_image(image, self._image_quality)
arcname = '{:06d}.{}'.format(idx, format_)
zip_file.writestr(arcname, image_buf.getvalue())
buff.seek(0)
return buff
class Mpeg4ChunkWriter(IChunkWriter):
def __init__(self, _):
super().__init__(17)
self._output_fps = 25
@staticmethod
def _create_av_container(path, w, h, rate, options):
def _create_av_container(path, w, h, rate, options, f=None):
# x264 requires width and height must be divisible by 2 for yuv420p
if h % 2:
h += 1
if w % 2:
w += 1
container = av.open(path, 'w')
container = av.open(path, 'w',format=f)
video_stream = container.add_stream('libx264', rate=rate)
video_stream.pix_fmt = "yuv420p"
video_stream.width = w
@ -341,6 +364,41 @@ class Mpeg4ChunkWriter(IChunkWriter):
output_container.close()
return [(input_w, input_h)]
def save_as_chunk_to_buff(self, frames, format_):
if not frames:
raise Exception('no images to save')
buff = io.BytesIO()
input_w = frames[0].width
input_h = frames[0].height
output_container, output_v_stream = self._create_av_container(
path=buff,
w=input_w,
h=input_h,
rate=self._output_fps,
options={
"crf": str(self._image_quality),
"preset": "ultrafast",
},
f=format_,
)
for frame in frames:
# let libav set the correct pts and time_base
frame.pts = None
frame.time_base = None
for packet in output_v_stream.encode(frame):
output_container.mux(packet)
# Flush streams
for packet in output_v_stream.encode():
output_container.mux(packet)
output_container.close()
buff.seek(0)
return buff
@staticmethod
def _encode_images(images, container, stream):
for frame, _, _ in images:

@ -102,6 +102,9 @@ class Data(models.Model):
def get_preview_path(self):
return os.path.join(self.get_data_dirname(), 'preview.jpeg')
def get_meta_path(self):
return os.path.join(self.get_upload_dirname(), 'meta_info.txt')
class Video(models.Model):
data = models.OneToOneField(Data, on_delete=models.CASCADE, related_name="video", null=True)
path = models.CharField(max_length=1024, default='')

@ -1,10 +1,11 @@
import av
import hashlib
class PrepareInfo:
def __init__(self, source_path, meta_path):
self.source_path = source_path
self.meta_path = meta_path
class WorkWithVideo:
def __init__(self, **kwargs):
if not kwargs.get('source_path'):
raise Exeption('No sourse path')
self.source_path = kwargs.get('source_path')
def _open_video_container(self, sourse_path, mode, options=None):
return av.open(sourse_path, mode=mode, options=options)
@ -17,22 +18,114 @@ class PrepareInfo:
video_stream.thread_type = 'AUTO'
return video_stream
#@get_execution_time
def save_meta_info(self):
class AnalyzeVideo(WorkWithVideo):
def check_type_first_frame(self):
container = self._open_video_container(self.source_path, mode='r')
video_stream = self._get_video_stream(container)
for packet in container.demux(video_stream):
for frame in packet.decode():
self._close_video_container(container)
assert frame.pict_type.name == 'I', 'First frame is not key frame'
return
def check_video_timestamps_sequences(self):
container = self._open_video_container(self.source_path, mode='r')
video_stream = self._get_video_stream(container)
with open(self.meta_path, 'w') as file:
frame_number = 0
frame_pts = -1
frame_dts = -1
for packet in container.demux(video_stream):
for frame in packet.decode():
if None not in [frame.pts, frame_pts] and frame.pts <= frame_pts:
self._close_video_container(container)
raise Exception('Invalid pts sequences')
if None not in [frame.dts, frame_dts] and frame.dts <= frame_dts:
self._close_video_container(container)
raise Exception('Invalid dts sequences')
frame_pts, frame_dts = frame.pts, frame.dts
self._close_video_container(container)
# class Frame:
# def __init__(self, frame, frame_number=None):
# self.frame = frame
# if frame_number:
# self.frame_number = frame_number
# def md5_hash(self):
# return hashlib.md5(self.frame.to_image().tobytes()).hexdigest()
# def __eq__(self, image):
# return self.md5_hash(self) == image.md5_hash(image) and self.frame.pts == image.frame.pts
# def __ne__(self, image):
# return md5_hash(self) != md5_hash(image) or self.frame.pts != image.frame.pts
# def __len__(self):
# return (self.frame.width, self.frame.height)
for packet in container.demux(video_stream):
def md5_hash(frame):
return hashlib.md5(frame.to_image().tobytes()).hexdigest()
class PrepareInfo(WorkWithVideo):
def __init__(self, **kwargs):
super().__init__(**kwargs)
if not kwargs.get('meta_path'):
raise Exception('No meta path')
self.meta_path = kwargs.get('meta_path')
self.key_frames = {}
self.frames = 0
def get_task_size(self):
return self.frames
def check_seek_key_frames(self):
container = self._open_video_container(self.source_path, mode='r')
video_stream = self._get_video_stream(container)
key_frames_copy = self.key_frames.copy()
for _, key_frame in key_frames_copy.items():
container.seek(offset=key_frame.pts, stream=video_stream)
flag = True
for packet in container.demux(video_stream):
for frame in packet.decode():
frame_number += 1
if frame.key_frame:
file.write('{} {}\n'.format(frame_number, frame.pts))
if md5_hash(frame) != md5_hash(key_frame) or frame.pts != key_frame.pts:
self.key_frames.pop(index)
flag = False
break
if not flag:
break
if len(self.key_frames) == 0: #or self.frames // len(self.key_frames) > 300:
raise Exception('Too few keyframes')
def save_key_frames(self):
container = self._open_video_container(self.source_path, mode='r')
video_stream = self._get_video_stream(container)
frame_number = 0
for packet in container.demux(video_stream):
for frame in packet.decode():
if frame.key_frame:
self.key_frames[frame_number] = frame
frame_number += 1
self.frames = frame_number
self._close_video_container(container)
return frame_number# == task_size
def save_meta_info(self):
with open(self.meta_path, 'w') as meta_file:
for index, frame in self.key_frames.items():
meta_file.write('{} {}\n'.format(index, frame.pts))
def get_nearest_left_key_frame(self, start_chunk_frame_number):
start_decode_frame_number = 0
@ -52,10 +145,9 @@ class PrepareInfo:
return int(start_decode_frame_number), int(start_decode_timestamp)
def decode_needed_frames(self, chunk_number, chunk_size):
start_chunk_frame_number = (chunk_number - 1) * chunk_size + 1
end_chunk_frame_number = start_chunk_frame_number + chunk_size #- 1
start_chunk_frame_number = chunk_number * chunk_size
end_chunk_frame_number = start_chunk_frame_number + chunk_size
start_decode_frame_number, start_decode_timestamp = self.get_nearest_left_key_frame(start_chunk_frame_number)
extra_frames = start_chunk_frame_number - start_decode_frame_number
container = self._open_video_container(self.source_path, mode='r')
video_stream = self._get_video_stream(container)
@ -73,4 +165,4 @@ class PrepareInfo:
self._close_video_container(container)
return
self._close_video_container(container)
self._close_video_container(container)

@ -24,7 +24,7 @@ from distutils.dir_util import copy_tree
from . import models
from .log import slogger
from .prepare import PrepareInfo
from .prepare import PrepareInfo, AnalyzeVideo
from diskcache import Cache
############################# Low Level server API
@ -232,22 +232,6 @@ def _create_thread(tid, data):
job.meta['status'] = 'Media files are being extracted...'
job.save_meta()
if settings.USE_CACHE:
for media_type, media_files in media.items():
if media_files:
if task_mode == MEDIA_TYPES['video']['mode']:
meta_info = PrepareInfo(source_path=os.path.join(upload_dir, media_files[0]),
meta_path=os.path.join(upload_dir, 'meta_info.txt'))
meta_info.save_meta_info()
# else:
# with Cache(settings.CACHE_ROOT) as cache:
# counter_ = itertools.count(start=1)
#TODO: chunk size
# for chunk_number, media_paths in itertools.groupby(media_files, lambda x: next(counter_) // db_data.chunk_size):
# cache.set('{}_{}'.format(tid, chunk_number), media_paths, tag='dummy')
#else:
db_images = []
extractor = None
@ -294,37 +278,91 @@ def _create_thread(tid, data):
else:
db_data.chunk_size = 36
#it's better to add the field to the Task model
video_suitable_on_the_fly_processing = True
video_path = ""
video_size = (0, 0)
counter = itertools.count()
generator = itertools.groupby(extractor, lambda x: next(counter) // db_data.chunk_size)
for chunk_idx, chunk_data in generator:
chunk_data = list(chunk_data)
original_chunk_path = db_data.get_original_chunk_path(chunk_idx)
original_chunk_writer.save_as_chunk(chunk_data, original_chunk_path)
compressed_chunk_path = db_data.get_compressed_chunk_path(chunk_idx)
img_sizes = compressed_chunk_writer.save_as_chunk(chunk_data, compressed_chunk_path)
if db_task.mode == 'annotation':
db_images.extend([
models.Image(
data=db_data,
path=os.path.relpath(data[1], upload_dir),
frame=data[2],
width=size[0],
height=size[1])
for data, size in zip(chunk_data, img_sizes)
])
else:
video_size = img_sizes[0]
video_path = chunk_data[0][1]
if settings.USE_CACHE:
for media_type, media_files in media.items():
if media_files:
if task_mode == MEDIA_TYPES['video']['mode']:
try:
analizer = AnalyzeVideo(source_path=os.path.join(upload_dir, media_files[0]))
analizer.check_type_first_frame()
analizer.check_video_timestamps_sequences()
meta_info = PrepareInfo(source_path=os.path.join(upload_dir, media_files[0]),
meta_path=os.path.join(upload_dir, 'meta_info.txt'))
meta_info.save_key_frames()
#meta_info.test_seek()
meta_info.check_seek_key_frames()
meta_info.save_meta_info()
db_data.size = meta_info.get_task_size()
video_path = os.path.join(upload_dir, media_files[0])
frame = meta_info.key_frames.get(next(iter(meta_info.key_frames)))
video_size = (frame.width, frame.height)
except AssertionError as ex:
video_suitable_on_the_fly_processing = False
except Exception as ex:
video_suitable_on_the_fly_processing = False
else:#images, TODO:archive
with Cache(settings.CACHE_ROOT) as cache:
counter_ = itertools.count()
for chunk_number, media_paths in itertools.groupby(media_files, lambda x: next(counter_) // db_data.chunk_size):
media_paths = list(media_paths)
cache.set('{}_{}'.format(tid, chunk_number), [os.path.join(upload_dir, file_name) for file_name in media_paths], tag='dummy')
img_sizes = []
from PIL import Image
for media_path in media_paths:
img_sizes += [Image.open(os.path.join(upload_dir, media_path)).size]
db_data.size += len(media_paths)
db_images.extend([
models.Image(
data=db_data,
path=data[1],
frame=data[0],
width=size[0],
height=size[1])
for data, size in zip(enumerate(media_paths), img_sizes)
])
if db_task.mode == 'interpolation' and not video_suitable_on_the_fly_processing or not settings.USE_CACHE:
counter = itertools.count()
generator = itertools.groupby(extractor, lambda x: next(counter) // db_data.chunk_size)
for chunk_idx, chunk_data in generator:
chunk_data = list(chunk_data)
original_chunk_path = db_data.get_original_chunk_path(chunk_idx)
original_chunk_writer.save_as_chunk(chunk_data, original_chunk_path)
compressed_chunk_path = db_data.get_compressed_chunk_path(chunk_idx)
img_sizes = compressed_chunk_writer.save_as_chunk(chunk_data, compressed_chunk_path)
if db_task.mode == 'annotation':
db_images.extend([
models.Image(
data=db_data,
path=os.path.relpath(data[1], upload_dir),
frame=data[2],
width=size[0],
height=size[1])
for data, size in zip(chunk_data, img_sizes)
])
else:
video_size = img_sizes[0]
video_path = chunk_data[0][1]
db_data.size += len(chunk_data)
progress = extractor.get_progress(chunk_data[-1][2])
update_progress(progress)
db_data.size += len(chunk_data)
progress = extractor.get_progress(chunk_data[-1][2])
update_progress(progress)
if db_task.mode == 'annotation':
models.Image.objects.bulk_create(db_images)
@ -342,4 +380,4 @@ def _create_thread(tid, data):
preview.save(db_data.get_preview_path())
slogger.glob.info("Founded frames {} for Data #{}".format(db_data.size, db_data.id))
_save_task_to_db(db_task)
_save_task_to_db(db_task)

@ -29,7 +29,7 @@ from rest_framework.exceptions import APIException
from rest_framework.permissions import SAFE_METHODS, IsAuthenticated
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response
from sendfile import sendfile
from sendfile import sendfile#убрать
import cvat.apps.dataset_manager as dm
import cvat.apps.dataset_manager.views # pylint: disable=unused-import
@ -37,7 +37,7 @@ from cvat.apps.authentication import auth
from cvat.apps.authentication.decorators import login_required
from cvat.apps.dataset_manager.serializers import DatasetFormatsSerializer
from cvat.apps.engine.frame_provider import FrameProvider
from cvat.apps.engine.models import Job, Plugin, StatusChoice, Task
from cvat.apps.engine.models import Job, Plugin, StatusChoice, Task, DataChoice
from cvat.apps.engine.serializers import (
AboutSerializer, AnnotationFileSerializer, BasicUserSerializer,
DataMetaSerializer, DataSerializer, ExceptionSerializer,
@ -49,6 +49,12 @@ from cvat.apps.engine.utils import av_scan_paths
from . import models, task
from .log import clogger, slogger
from .media_extractors import (
Mpeg4ChunkWriter, Mpeg4CompressedChunkWriter,
ZipCompressedChunkWriter, ZipChunkWriter)
from .prepare import PrepareInfo
from diskcache import Cache
#from cvat.apps.engine.mime_types import mimetypes
# drf-yasg component doesn't handle correctly URL_FORMAT_OVERRIDE and
@ -437,13 +443,63 @@ class TaskViewSet(auth.TaskGetQuerySetMixin, viewsets.ModelViewSet):
try:
db_task = self.get_object()
db_data = db_task.data
frame_provider = FrameProvider(db_task.data)
if data_type == 'chunk':
data_id = int(data_id)
quality = data_quality
data_quality = FrameProvider.Quality.COMPRESSED \
if data_quality == 'compressed' else FrameProvider.Quality.ORIGINAL
path = os.path.realpath(frame_provider.get_chunk(data_id, data_quality))
#TODO: av.FFmpegError processing
if settings.USE_CACHE:
with Cache(settings.CACHE_ROOT) as cache:
buff = None
chunk, tag = cache.get('{}_{}_{}'.format(db_task.id, data_id, quality), tag=True)
if not chunk:
extractor_classes = {
'compressed' : Mpeg4CompressedChunkWriter if db_data.compressed_chunk_type == DataChoice.VIDEO else ZipCompressedChunkWriter,
'original' : Mpeg4ChunkWriter if db_data.original_chunk_type == DataChoice.VIDEO else ZipChunkWriter,
}
image_quality = 100 if extractor_classes[quality] in [Mpeg4ChunkWriter, ZipChunkWriter] else db_data.image_quality
file_extension = 'mp4' if extractor_classes[quality] in [Mpeg4ChunkWriter, Mpeg4CompressedChunkWriter] else 'jpeg'
mime_type = 'video/mp4' if extractor_classes[quality] in [Mpeg4ChunkWriter, Mpeg4CompressedChunkWriter] else 'application/zip'
extractor = extractor_classes[quality](image_quality)
if 'interpolation' == db_task.mode:
meta = PrepareInfo(source_path=os.path.join(db_data.get_upload_dirname(), db_data.video.path),
meta_path=db_data.get_meta_path())
frames = []
for frame in meta.decode_needed_frames(data_id,db_data.chunk_size):
frames.append(frame)
buff = extractor.save_as_chunk_to_buff(frames,
format_=file_extension)
cache.set('{}_{}_{}'.format(db_task.id, data_id, quality), buff, tag=mime_type)
else:
img_paths = cache.get('{}_{}'.format(db_task.id, data_id))
buff = extractor.save_as_chunk_to_buff(img_paths,
format_=file_extension)
cache.set('{}_{}_{}'.format(db_task.id, data_id, quality), buff, tag=mime_type)
elif 'process_creating' == tag:
pass
else:
buff, mime_type = cache.get('{}_{}_{}'.format(db_task.id, data_id, quality), tag=True)
return HttpResponse(buff.getvalue(), content_type=mime_type)
# Follow symbol links if the chunk is a link on a real image otherwise
# mimetype detection inside sendfile will work incorrectly.

@ -49,3 +49,4 @@ av==6.2.0
# The package is used by pyunpack as a command line tool to support multiple
# archives. Don't use as a python module because it has GPL license.
patool==1.12
diskcache==4.1.0
Loading…
Cancel
Save