SDK: add a high-level method to download task data chunks (#5356)

main
Roman Donchenko 3 years ago committed by GitHub
parent 85b5547541
commit 0a16cfce5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -9,9 +9,10 @@ import json
import mimetypes import mimetypes
import os import os
import os.path as osp import os.path as osp
import shutil
from enum import Enum from enum import Enum
from time import sleep from time import sleep
from typing import Any, Dict, List, Optional, Sequence from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence
from PIL import Image from PIL import Image
@ -32,6 +33,9 @@ from cvat_sdk.core.proxies.model_proxy import (
from cvat_sdk.core.uploading import AnnotationUploader, DataUploader, Uploader from cvat_sdk.core.uploading import AnnotationUploader, DataUploader, Uploader
from cvat_sdk.core.utils import filter_dict from cvat_sdk.core.utils import filter_dict
if TYPE_CHECKING:
from _typeshed import SupportsWrite
class ResourceType(Enum): class ResourceType(Enum):
LOCAL = 0 LOCAL = 0
@ -153,6 +157,23 @@ class Task(
(_, response) = self.api.retrieve_data(self.id, type="preview") (_, response) = self.api.retrieve_data(self.id, type="preview")
return io.BytesIO(response.data) return io.BytesIO(response.data)
def download_chunk(
self,
chunk_id: int,
output_file: SupportsWrite[bytes],
*,
quality: Optional[str] = None,
) -> None:
params = {}
if quality:
params["quality"] = quality
(_, response) = self.api.retrieve_data(
self.id, number=chunk_id, **params, type="chunk", _parse_response=False
)
with response:
shutil.copyfileobj(response, output_file)
def download_frames( def download_frames(
self, self,
frame_ids: Sequence[int], frame_ids: Sequence[int],

@ -4,6 +4,7 @@
import io import io
import os.path as osp import os.path as osp
import zipfile
from logging import Logger from logging import Logger
from pathlib import Path from pathlib import Path
from typing import Tuple from typing import Tuple
@ -268,6 +269,18 @@ class TestTaskUsecases:
assert osp.isfile(self.tmp_path / "frame-0.jpg") assert osp.isfile(self.tmp_path / "frame-0.jpg")
assert self.stdout.getvalue() == "" assert self.stdout.getvalue() == ""
@pytest.mark.parametrize("quality", ("compressed", "original"))
def test_can_download_chunk(self, fxt_new_task: Task, quality: str):
chunk_path = self.tmp_path / "chunk.zip"
with open(chunk_path, "wb") as chunk_file:
fxt_new_task.download_chunk(0, chunk_file, quality=quality)
with zipfile.ZipFile(chunk_path, "r") as chunk_zip:
assert chunk_zip.testzip() is None
assert len(chunk_zip.infolist()) == 1
assert self.stdout.getvalue() == ""
def test_can_upload_annotations(self, fxt_new_task: Task, fxt_coco_file: Path): def test_can_upload_annotations(self, fxt_new_task: Task, fxt_coco_file: Path):
pbar_out = io.StringIO() pbar_out = io.StringIO()
pbar = make_pbar(file=pbar_out) pbar = make_pbar(file=pbar_out)

Loading…
Cancel
Save