Added tests

main
Maya 6 years ago
parent b47d3d5683
commit d29f4cb6e0

@ -72,13 +72,14 @@ import av
import numpy as np
from django.conf import settings
from django.contrib.auth.models import Group, User
from django.http import HttpResponse
from PIL import Image
from pycocotools import coco as coco_loader
from rest_framework import status
from rest_framework.test import APIClient, APITestCase
from cvat.apps.engine.models import (AttributeType, Data, Job, Project,
Segment, StatusChoice, Task)
Segment, StatusChoice, Task, StorageMethodChoice)
_setUpModule()
@ -1670,7 +1671,8 @@ class TaskDataAPITestCase(APITestCase):
stream = container.streams.video[0]
return [f.to_image() for f in container.decode(stream)]
def _test_api_v1_tasks_id_data_spec(self, user, spec, data, expected_compressed_type, expected_original_type, image_sizes):
def _test_api_v1_tasks_id_data_spec(self, user, spec, data, expected_compressed_type, expected_original_type, image_sizes,
expected_storage_method=StorageMethodChoice.FILE_SYSTEM):
# create task
response = self._create_task(user, spec)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
@ -1694,6 +1696,7 @@ class TaskDataAPITestCase(APITestCase):
self.assertEqual(expected_compressed_type, task["data_compressed_chunk_type"])
self.assertEqual(expected_original_type, task["data_original_chunk_type"])
self.assertEqual(len(image_sizes), task["size"])
self.assertEqual(expected_storage_method, Task.objects.get(pk=task_id).data.storage_method)
# check preview
response = self._get_preview(task_id, user)
@ -1706,7 +1709,10 @@ class TaskDataAPITestCase(APITestCase):
response = self._get_compressed_chunk(task_id, user, 0)
self.assertEqual(response.status_code, expected_status_code)
if expected_status_code == status.HTTP_200_OK:
compressed_chunk = io.BytesIO(b"".join(response.streaming_content))
if isinstance(response, HttpResponse):
compressed_chunk = io.BytesIO(response.content)
else:
compressed_chunk = io.BytesIO(b"".join(response.streaming_content))
if task["data_compressed_chunk_type"] == self.ChunkType.IMAGESET:
images = self._extract_zip_chunk(compressed_chunk)
else:
@ -1721,7 +1727,10 @@ class TaskDataAPITestCase(APITestCase):
response = self._get_original_chunk(task_id, user, 0)
self.assertEqual(response.status_code, expected_status_code)
if expected_status_code == status.HTTP_200_OK:
original_chunk = io.BytesIO(b"".join(response.streaming_content))
if isinstance(response, HttpResponse):
original_chunk = io.BytesIO(response.getvalue())
else:
original_chunk = io.BytesIO(b"".join(response.streaming_content))
if task["data_original_chunk_type"] == self.ChunkType.IMAGESET:
images = self._extract_zip_chunk(original_chunk)
else:
@ -1909,6 +1918,74 @@ class TaskDataAPITestCase(APITestCase):
self._test_api_v1_tasks_id_data_spec(user, task_spec, task_data, self.ChunkType.IMAGESET, self.ChunkType.IMAGESET, image_sizes)
task_spec = {
"name": "use_cache video task #8",
"overlap": 0,
"segment_size": 0,
"labels": [
{"name": "car"},
{"name": "person"},
]
}
task_data = {
"server_files[0]": 'test_video_1.mp4',
"image_quality": 70,
"use_cache": True,
}
image_sizes = self._image_sizes[task_data["server_files[0]"]]
self._test_api_v1_tasks_id_data_spec(user, task_spec, task_data, self.ChunkType.VIDEO,
self.ChunkType.VIDEO, image_sizes, StorageMethodChoice.CACHE)
task_spec = {
"name": "use_cache images task #9",
"overlap": 0,
"segment_size": 0,
"labels": [
{"name": "car"},
{"name": "person"},
]
}
task_data = {
"server_files[0]": "test_1.jpg",
"server_files[1]": "test_2.jpg",
"server_files[2]": "test_3.jpg",
"image_quality": 70,
"use_cache": True,
}
image_sizes = [
self._image_sizes[task_data["server_files[0]"]],
self._image_sizes[task_data["server_files[1]"]],
self._image_sizes[task_data["server_files[2]"]],
]
self._test_api_v1_tasks_id_data_spec(user, task_spec, task_data, self.ChunkType.IMAGESET,
self.ChunkType.IMAGESET, image_sizes, StorageMethodChoice.CACHE)
task_spec = {
"name": "my zip archive task #10",
"overlap": 0,
"segment_size": 0,
"labels": [
{"name": "car"},
{"name": "person"},
]
}
task_data = {
"server_files[0]": "test_archive_1.zip",
"image_quality": 70,
"use_cache": True
}
image_sizes = self._image_sizes[task_data["server_files[0]"]]
self._test_api_v1_tasks_id_data_spec(user, task_spec, task_data, self.ChunkType.IMAGESET,
self.ChunkType.IMAGESET, image_sizes, StorageMethodChoice.CACHE)
def test_api_v1_tasks_id_data_admin(self):
self._test_api_v1_tasks_id_data(self.admin)

@ -22,6 +22,8 @@ os.makedirs(TASKS_ROOT, exist_ok=True)
MODELS_ROOT = os.path.join(DATA_ROOT, 'models')
os.makedirs(MODELS_ROOT, exist_ok=True)
CACHE_ROOT = os.path.join(DATA_ROOT, 'cache')
os.makedirs(CACHE_ROOT, exist_ok=True)
# To avoid ERROR django.security.SuspiciousFileOperation:
# The joined path (...) is located outside of the base path component

Loading…
Cancel
Save