Some additions:

* moved cache implementation
 * fixed start, stop, step for tasks that created on the fly
 * changed frame provider
main
Maya 6 years ago
parent 94bb3659c9
commit f3630aa4b5

@ -0,0 +1,61 @@
from diskcache import Cache
from django.conf import settings
from cvat.apps.engine.media_extractors import (Mpeg4ChunkWriter, ZipChunkWriter,
Mpeg4CompressedChunkWriter, ZipCompressedChunkWriter)
from cvat.apps.engine.models import DataChoice
from .prepare import PrepareInfo
import os
class CacheInteraction:
def __init__(self):
self._cache = Cache(settings.CACHE_ROOT)
def __del__(self):
self._cache.close()
def get_buff_mime(self, chunk_number, quality, db_data):
chunk, tag = self._cache.get('{}_{}_{}'.format(db_data.id, chunk_number, quality), tag=True)
if not chunk:
chunk, tag = self.prepare_chunk_buff(db_data, quality, chunk_number)
self.save_chunk(db_data.id, chunk_number, quality, chunk, tag)
return chunk, tag
def get_buff(self, chunk_number, quality, db_data):
chunk, tag = self._cache.get('{}_{}_{}'.format(db_data.id, chunk_number, quality), tag=True)
if not chunk:
chunk, tag = self.prepare_chunk_buff(db_data, quality, chunk_number)
self.save_chunk(db_data.id, chunk_number, quality, chunk, tag)
return chunk
def prepare_chunk_buff(self, db_data, quality, chunk_number):
from cvat.apps.engine.frame_provider import FrameProvider
extractor_classes = {
FrameProvider.Quality.COMPRESSED : Mpeg4CompressedChunkWriter if db_data.compressed_chunk_type == DataChoice.VIDEO else ZipCompressedChunkWriter,
FrameProvider.Quality.ORIGINAL : Mpeg4ChunkWriter if db_data.original_chunk_type == DataChoice.VIDEO else ZipChunkWriter,
}
image_quality = 100 if extractor_classes[quality] in [Mpeg4ChunkWriter, ZipChunkWriter] else db_data.image_quality
file_extension = 'mp4' if extractor_classes[quality] in [Mpeg4ChunkWriter, Mpeg4CompressedChunkWriter] else 'jpeg'
mime_type = 'video/mp4' if extractor_classes[quality] in [Mpeg4ChunkWriter, Mpeg4CompressedChunkWriter] else 'application/zip'
extractor = extractor_classes[quality](image_quality)
#if 'interpolation' == task_mode:
if os.path.exists(db_data.get_meta_path()):
meta = PrepareInfo(source_path=os.path.join(db_data.get_upload_dirname(), db_data.video.path),
meta_path=db_data.get_meta_path())
frames = []
for frame in meta.decode_needed_frames(chunk_number, db_data):#db_data.chunk_size
frames.append(frame)
buff = extractor.save_as_chunk_to_buff(frames, file_extension)
else:
img_paths = None
with open(db_data.get_dummy_chunk_path(chunk_number), 'r') as dummy_file:
img_paths = [line.strip() for line in dummy_file]
buff = extractor.save_as_chunk_to_buff(img_paths, file_extension)
return buff, mime_type
def save_chunk(self, db_data_id, chunk_number, quality, buff, mime_type):
self._cache.set('{}_{}_{}'.format(db_data_id, chunk_number, quality), buff, tag=mime_type)

@ -11,8 +11,8 @@ from PIL import Image
from cvat.apps.engine.media_extractors import VideoReader, ZipReader
from cvat.apps.engine.mime_types import mimetypes
from cvat.apps.engine.models import DataChoice
from cvat.apps.engine.models import DataChoice, StorageMethodChoice
from .cache import CacheInteraction
class RandomAccessIterator:
def __init__(self, iterable):
@ -65,6 +65,20 @@ class FrameProvider:
self.reader_class([self.get_chunk_path(chunk_id)]))
return self.chunk_reader
class BuffChunkLoader(ChunkLoader):
def __init__(self, reader_class, path_getter, buff_mime_getter, quality, db_data):
super().__init__(reader_class, path_getter)
self.get_chunk = buff_mime_getter
self.quality = quality
self.db_data = db_data
def load(self, chunk_id):
if self.chunk_id != chunk_id:
self.chunk_id = chunk_id
self.chunk_reader = RandomAccessIterator(
self.reader_class([self.get_chunk_path(chunk_id, self.quality, self.db_data)]))
return self.chunk_reader
def __init__(self, db_data):
self._db_data = db_data
self._loaders = {}
@ -73,12 +87,29 @@ class FrameProvider:
DataChoice.IMAGESET: ZipReader,
DataChoice.VIDEO: VideoReader,
}
self._loaders[self.Quality.COMPRESSED] = self.ChunkLoader(
reader_class[db_data.compressed_chunk_type],
db_data.get_compressed_chunk_path)
self._loaders[self.Quality.ORIGINAL] = self.ChunkLoader(
reader_class[db_data.original_chunk_type],
db_data.get_original_chunk_path)
if db_data.storage_method == StorageMethodChoice.CACHE:
cache = CacheInteraction()
self._loaders[self.Quality.COMPRESSED] = self.BuffChunkLoader(
reader_class[db_data.compressed_chunk_type],
cache.get_buff,
cache.get_buff_mime,
self.Quality.COMPRESSED,
self._db_data)
self._loaders[self.Quality.ORIGINAL] = self.BuffChunkLoader(
reader_class[db_data.original_chunk_type],
cache.get_buff,
cache.get_buff_mime,
self.Quality.ORIGINAL,
self._db_data)
else:
self._loaders[self.Quality.COMPRESSED] = self.ChunkLoader(
reader_class[db_data.compressed_chunk_type],
db_data.get_compressed_chunk_path)
self._loaders[self.Quality.ORIGINAL] = self.ChunkLoader(
reader_class[db_data.original_chunk_type],
db_data.get_original_chunk_path)
def __len__(self):
return self._db_data.size
@ -129,6 +160,8 @@ class FrameProvider:
def get_chunk(self, chunk_number, quality=Quality.ORIGINAL):
chunk_number = self._validate_chunk_number(chunk_number)
if self._db_data.storage_method == StorageMethodChoice.CACHE:
return self._loaders[quality].get_chunk(chunk_number, quality, self._db_data)
return self._loaders[quality].get_chunk_path(chunk_number)
def get_frame(self, frame_number, quality=Quality.ORIGINAL,

@ -190,7 +190,10 @@ class ZipReader(ImageListReader):
return io.BytesIO(self._zip_source.read(self._source_path[i]))
def get_path(self, i):
return os.path.join(os.path.dirname(self._zip_source.filename), self._source_path[i])
if self._zip_source.filename:
return os.path.join(os.path.dirname(self._zip_source.filename), self._source_path[i])
else: #для определения mime_type
return self._source_path[i]
def extract(self):
self._zip_source.extractall(os.path.dirname(self._zip_source.filename))

@ -117,6 +117,9 @@ class Data(models.Model):
def get_meta_path(self):
return os.path.join(self.get_upload_dirname(), 'meta_info.txt')
def get_dummy_chunk_path(self, chunk_number):
return os.path.join(self.get_upload_dirname(), 'dummy_{}.txt'.format(chunk_number))
class Video(models.Model):
data = models.OneToOneField(Data, on_delete=models.CASCADE, related_name="video", null=True)
path = models.CharField(max_length=1024, default='')

@ -144,11 +144,11 @@ class PrepareInfo(WorkWithVideo):
return int(start_decode_frame_number), int(start_decode_timestamp)
def decode_needed_frames(self, chunk_number, chunk_size):
start_chunk_frame_number = chunk_number * chunk_size
end_chunk_frame_number = start_chunk_frame_number + chunk_size
def decode_needed_frames(self, chunk_number, db_data):
step = db_data.get_frame_step()
start_chunk_frame_number = db_data.start_frame + chunk_number * db_data.chunk_size * step
end_chunk_frame_number = min(start_chunk_frame_number + (db_data.chunk_size - 1) * step + 1, db_data.stop_frame + 1)
start_decode_frame_number, start_decode_timestamp = self.get_nearest_left_key_frame(start_chunk_frame_number)
container = self._open_video_container(self.source_path, mode='r')
video_stream = self._get_video_stream(container)
container.seek(offset=start_decode_timestamp, stream=video_stream)
@ -159,8 +159,10 @@ class PrepareInfo(WorkWithVideo):
frame_number += 1
if frame_number < start_chunk_frame_number:
continue
elif frame_number < end_chunk_frame_number:
elif frame_number < end_chunk_frame_number and not (frame_number % step):
yield frame
elif frame_number % step:
continue
else:
self._close_video_container(container)
return

@ -25,7 +25,6 @@ from distutils.dir_util import copy_tree
from . import models
from .log import slogger
from .prepare import PrepareInfo, AnalyzeVideo
from diskcache import Cache
############################# Low Level server API
@ -299,7 +298,8 @@ def _create_thread(tid, data):
meta_info.check_seek_key_frames()
meta_info.save_meta_info()
db_data.size = meta_info.get_task_size()
all_frames = meta_info.get_task_size()
db_data.size = len(range(db_data.start_frame, min(data['stop_frame'] + 1 if data['stop_frame'] else all_frames, all_frames), db_data.get_frame_step()))
video_path = os.path.join(upload_dir, media_files[0])
frame = meta_info.key_frames.get(next(iter(meta_info.key_frames)))
video_size = (frame.width, frame.height)
@ -310,31 +310,33 @@ def _create_thread(tid, data):
db_data.storage_method = StorageMethodChoice.FILE_SYSTEM
else:#images,archive
with Cache(settings.CACHE_ROOT) as cache:
counter_ = itertools.count()
if extractor.__class__ in [MEDIA_TYPES['archive']['extractor'], MEDIA_TYPES['zip']['extractor']]:
media_files = [os.path.join(upload_dir, f) for f in extractor._source_path]
for chunk_number, media_paths in itertools.groupby(media_files, lambda x: next(counter_) // db_data.chunk_size):
media_paths = list(media_paths)
cache.set('{}_{}'.format(tid, chunk_number), [os.path.join(upload_dir, file_name) for file_name in media_paths], tag='dummy')
img_sizes = []
from PIL import Image
for media_path in media_paths:
img_sizes += [Image.open(media_path).size]
db_data.size += len(media_paths)
db_images.extend([
models.Image(
data=db_data,
path=os.path.basename(data[1]),
frame=data[0],
width=size[0],
height=size[1])
for data, size in zip(enumerate(media_paths, start=len(db_images)), img_sizes)
])
counter_ = itertools.count()
if extractor.__class__ in [MEDIA_TYPES['archive']['extractor'], MEDIA_TYPES['zip']['extractor']]:
media_files = [os.path.join(upload_dir, f) for f in extractor._source_path]
numbers_sequence = range(db_data.start_frame, min(data['stop_frame'] if data['stop_frame'] else len(media_files), len(media_files)), db_data.get_frame_step())
m_paths = []
m_paths = [(path, numb) for numb, path in enumerate(media_files) if numb in numbers_sequence]
for chunk_number, media_paths in itertools.groupby(m_paths, lambda x: next(counter_) // db_data.chunk_size):
media_paths = list(media_paths)
img_sizes = []
from PIL import Image
with open(db_data.get_dummy_chunk_path(chunk_number), 'w') as dummy_chunk:
for path, _ in media_paths:
dummy_chunk.write(os.path.join(upload_dir, path)+'\n')
img_sizes += [Image.open(os.path.join(upload_dir, path)).size]
db_data.size += len(media_paths)
db_images.extend([
models.Image(
data=db_data,
path=os.path.basename(data[0]),
frame=data[1],
width=size[0],
height=size[1])
for data, size in zip(media_paths, img_sizes)
])
if db_data.storage_method == StorageMethodChoice.FILE_SYSTEM or not settings.USE_CACHE:
counter = itertools.count()

@ -37,7 +37,7 @@ from cvat.apps.authentication import auth
from cvat.apps.authentication.decorators import login_required
from cvat.apps.dataset_manager.serializers import DatasetFormatsSerializer
from cvat.apps.engine.frame_provider import FrameProvider
from cvat.apps.engine.models import Job, Plugin, StatusChoice, Task, DataChoice, StorageMethodChoice
from cvat.apps.engine.models import Job, Plugin, StatusChoice, Task, StorageMethodChoice
from cvat.apps.engine.serializers import (
AboutSerializer, AnnotationFileSerializer, BasicUserSerializer,
DataMetaSerializer, DataSerializer, ExceptionSerializer,
@ -49,13 +49,6 @@ from cvat.apps.engine.utils import av_scan_paths
from . import models, task
from .log import clogger, slogger
from .media_extractors import (
Mpeg4ChunkWriter, Mpeg4CompressedChunkWriter,
ZipCompressedChunkWriter, ZipChunkWriter)
from .prepare import PrepareInfo
from diskcache import Cache
#from cvat.apps.engine.mime_types import mimetypes
# drf-yasg component doesn't handle correctly URL_FORMAT_OVERRIDE and
# send requests with ?format=openapi suffix instead of ?scheme=openapi.
@ -458,56 +451,14 @@ class TaskViewSet(auth.TaskGetQuerySetMixin, viewsets.ModelViewSet):
data_quality = FrameProvider.Quality.COMPRESSED \
if data_quality == 'compressed' else FrameProvider.Quality.ORIGINAL
path = os.path.realpath(frame_provider.get_chunk(data_id, data_quality))
#TODO: av.FFmpegError processing
if settings.USE_CACHE and db_data.storage_method == StorageMethodChoice.CACHE:
with Cache(settings.CACHE_ROOT) as cache:
buff = None
chunk, tag = cache.get('{}_{}_{}'.format(db_task.id, data_id, quality), tag=True)
if not chunk:
extractor_classes = {
'compressed' : Mpeg4CompressedChunkWriter if db_data.compressed_chunk_type == DataChoice.VIDEO else ZipCompressedChunkWriter,
'original' : Mpeg4ChunkWriter if db_data.original_chunk_type == DataChoice.VIDEO else ZipChunkWriter,
}
image_quality = 100 if extractor_classes[quality] in [Mpeg4ChunkWriter, ZipChunkWriter] else db_data.image_quality
file_extension = 'mp4' if extractor_classes[quality] in [Mpeg4ChunkWriter, Mpeg4CompressedChunkWriter] else 'jpeg'
mime_type = 'video/mp4' if extractor_classes[quality] in [Mpeg4ChunkWriter, Mpeg4CompressedChunkWriter] else 'application/zip'
extractor = extractor_classes[quality](image_quality)
if 'interpolation' == db_task.mode:
meta = PrepareInfo(source_path=os.path.join(db_data.get_upload_dirname(), db_data.video.path),
meta_path=db_data.get_meta_path())
frames = []
for frame in meta.decode_needed_frames(data_id,db_data.chunk_size):
frames.append(frame)
buff = extractor.save_as_chunk_to_buff(frames,
format_=file_extension)
cache.set('{}_{}_{}'.format(db_task.id, data_id, quality), buff, tag=mime_type)
else:
img_paths = cache.get('{}_{}'.format(db_task.id, data_id))
buff = extractor.save_as_chunk_to_buff(img_paths,
format_=file_extension)
cache.set('{}_{}_{}'.format(db_task.id, data_id, quality), buff, tag=mime_type)
elif 'process_creating' == tag:
pass
else:
buff, mime_type = cache.get('{}_{}_{}'.format(db_task.id, data_id, quality), tag=True)
return HttpResponse(buff.getvalue(), content_type=mime_type)
buff, mime_type = frame_provider.get_chunk(data_id, data_quality)
return HttpResponse(buff.getvalue(), content_type=mime_type)
# Follow symbol links if the chunk is a link on a real image otherwise
# mimetype detection inside sendfile will work incorrectly.
path = os.path.realpath(frame_provider.get_chunk(data_id, data_quality))
return sendfile(request, path)
elif data_type == 'frame':

Loading…
Cancel
Save