@ -7,10 +7,9 @@ from __future__ import annotations
import io
import json
import mimetypes
import os
import os . path as osp
import shutil
from enum import Enum
from pathlib import Path
from time import sleep
from typing import TYPE_CHECKING , Any , Dict , List , Optional , Sequence
@ -34,7 +33,7 @@ from cvat_sdk.core.uploading import AnnotationUploader, DataUploader, Uploader
from cvat_sdk . core . utils import filter_dict
if TYPE_CHECKING :
from _typeshed import S upportsWrite
from _typeshed import S trPath, S upportsWrite
class ResourceType ( Enum ) :
@ -67,7 +66,7 @@ class Task(
def upload_data (
self ,
resource_type : ResourceType ,
resources : Sequence [ str ] ,
resources : Sequence [ StrPath ] ,
* ,
pbar : Optional [ ProgressReporter ] = None ,
params : Optional [ Dict [ str , Any ] ] = None ,
@ -77,15 +76,8 @@ class Task(
"""
params = params or { }
data = { }
if resource_type is ResourceType . LOCAL :
pass # handled later
elif resource_type is ResourceType . REMOTE :
data [ " remote_files " ] = resources
elif resource_type is ResourceType . SHARE :
data [ " server_files " ] = resources
data = { " image_quality " : 70 }
data [ " image_quality " ] = 70
data . update (
filter_dict (
params ,
@ -105,6 +97,15 @@ class Task(
data [ " frame_filter " ] = f " step= { params . get ( ' frame_step ' ) } "
if resource_type in [ ResourceType . REMOTE , ResourceType . SHARE ] :
for resource in resources :
if not isinstance ( resource , str ) :
raise TypeError ( f " resources: expected instances of str, got { type ( resource ) } " )
if resource_type is ResourceType . REMOTE :
data [ " remote_files " ] = resources
elif resource_type is ResourceType . SHARE :
data [ " server_files " ] = resources
self . api . create_data (
self . id ,
data_request = models . DataRequest ( * * data ) ,
@ -114,12 +115,14 @@ class Task(
self . api . create_data_endpoint . path , kwsub = { " id " : self . id }
)
DataUploader ( self . _client ) . upload_files ( url , resources , pbar = pbar , * * data )
DataUploader ( self . _client ) . upload_files (
url , list ( map ( Path , resources ) ) , pbar = pbar , * * data
)
def import_annotations (
self ,
format_name : str ,
filename : str ,
filename : StrPath ,
* ,
status_check_period : Optional [ int ] = None ,
pbar : Optional [ ProgressReporter ] = None ,
@ -128,6 +131,8 @@ class Task(
Upload annotations for a task in the specified format ( e . g . ' YOLO ZIP 1.0 ' ) .
"""
filename = Path ( filename )
AnnotationUploader ( self . _client ) . upload_file_and_wait (
self . api . create_annotations_endpoint ,
filename ,
@ -178,7 +183,7 @@ class Task(
self ,
frame_ids : Sequence [ int ] ,
* ,
outdir : str = " " ,
outdir : StrPath = " . " ,
quality : str = " original " ,
filename_pattern : str = " frame_ {frame_id:06d} {frame_ext} " ,
) - > Optional [ List [ Image . Image ] ] :
@ -186,7 +191,9 @@ class Task(
Download the requested frame numbers for a task and save images as outdir / filename_pattern
"""
# TODO: add arg descriptions in schema
os . makedirs ( outdir , exist_ok = True )
outdir = Path ( outdir )
outdir . mkdir ( exist_ok = True )
for frame_id in frame_ids :
frame_bytes = self . get_frame ( frame_id , quality = quality )
@ -202,12 +209,12 @@ class Task(
im_ext = " .jpg "
outfile = filename_pattern . format ( frame_id = frame_id , frame_ext = im_ext )
im . save ( o sp. join ( outdir , outfile ) )
im . save ( o utdir / outfile )
def export_dataset (
self ,
format_name : str ,
filename : str ,
filename : StrPath ,
* ,
pbar : Optional [ ProgressReporter ] = None ,
status_check_period : Optional [ int ] = None ,
@ -216,6 +223,9 @@ class Task(
"""
Download annotations for a task in the specified format ( e . g . ' YOLO ZIP 1.0 ' ) .
"""
filename = Path ( filename )
if include_images :
endpoint = self . api . retrieve_dataset_endpoint
else :
@ -234,7 +244,7 @@ class Task(
def download_backup (
self ,
filename : str ,
filename : StrPath ,
* ,
status_check_period : int = None ,
pbar : Optional [ ProgressReporter ] = None ,
@ -243,6 +253,8 @@ class Task(
Download a task backup
"""
filename = Path ( filename )
Downloader ( self . _client ) . prepare_and_download_file_from_endpoint (
self . api . retrieve_backup_endpoint ,
filename = filename ,
@ -370,7 +382,7 @@ class TasksRepo(
def create_from_backup (
self ,
filename : str ,
filename : StrPath ,
* ,
status_check_period : int = None ,
pbar : Optional [ ProgressReporter ] = None ,
@ -378,10 +390,13 @@ class TasksRepo(
"""
Import a task from a backup file
"""
filename = Path ( filename )
if status_check_period is None :
status_check_period = self . _client . config . status_check_period
params = { " filename " : osp. basename ( filename ) }
params = { " filename " : filename. name }
url = self . _client . api_map . make_endpoint_url ( self . api . create_backup_endpoint . path )
uploader = Uploader ( self . _client )
response = uploader . upload_file (