@ -2,23 +2,33 @@
#
# SPDX-License-Identifier: MIT
from __future__ import annotations
from contextlib import closing
from typing import Optional , Tuple
import tqdm
import json
import logging
import os
import os . path as osp
import requests
from io import BytesIO
import mimetypes
from time import sleep
from PIL import Image
from tusclient import client
from tusclient import uploader
from tusclient . request import TusRequest , TusUploadFailed
from utils . cli . core . utils import StreamWithProgress
from . definition import ResourceType
log = logging . getLogger ( __name__ )
class CLI ( ) :
def __init__ ( self , session , api , credentials ) :
class CLI :
def __init__ ( self , session : requests . Session , api : CVAT_API_V2 , credentials : Tuple [ str , str ] ) :
self . api = api
self . session = session
self . login ( credentials )
@ -77,9 +87,13 @@ class CLI():
completion_verification_period = 20 ,
git_completion_verification_period = 2 ,
dataset_repository_url = ' ' ,
lfs = False , * * kwargs ) :
""" Create a new task with the given name and labels JSON and
add the files to it . """
lfs = False , * * kwargs ) - > int :
"""
Create a new task with the given name and labels JSON and
add the files to it .
Returns : id of the created task
"""
url = self . api . tasks
labels = [ ] if kwargs . get ( ' project_id ' ) is not None else labels
data = { ' name ' : name ,
@ -94,7 +108,9 @@ class CLI():
response . raise_for_status ( )
response_json = response . json ( )
log . info ( ' Created task ID: {id} NAME: {name} ' . format ( * * response_json ) )
task_id = response_json [ ' id ' ]
assert isinstance ( task_id , int )
self . tasks_data ( task_id , resource_type , resources , * * kwargs )
if annotation_path != ' ' :
@ -114,6 +130,7 @@ class CLI():
log . info ( logger_string )
self . tasks_upload ( task_id , annotation_format , annotation_path , * * kwargs )
if dataset_repository_url :
response = self . session . post (
self . api . git_create ( task_id ) ,
@ -140,6 +157,8 @@ class CLI():
log . info ( f " Dataset repository creation completed with status: { response_json [ ' status ' ] } . " )
return task_id
def tasks_delete ( self , task_ids , * * kwargs ) :
""" Delete a list of tasks, ignoring those which don ' t exist. """
for task_id in task_ids :
@ -173,9 +192,12 @@ class CLI():
outfile = ' task_ {} _frame_ {:06d} {} ' . format ( task_id , frame_id , im_ext )
im . save ( os . path . join ( outdir , outfile ) )
def tasks_dump ( self , task_id , fileformat , filename , * * kwargs ) :
""" Download annotations for a task in the specified format
( e . g . ' YOLO ZIP 1.0 ' ) . """
def tasks_dump ( self , task_id , fileformat , filename , * ,
pbar = None , completion_check_period = 2 , * * kwargs ) - > None :
"""
Download annotations for a task in the specified format ( e . g . ' YOLO ZIP 1.0 ' ) .
"""
url = self . api . tasks_id ( task_id )
response = self . session . get ( url )
response . raise_for_status ( )
@ -184,70 +206,178 @@ class CLI():
url = self . api . tasks_id_annotations_filename ( task_id ,
response_json [ ' name ' ] ,
fileformat )
log . info ( " Waiting for the server to prepare the file... " )
while True :
response = self . session . get ( url )
response . raise_for_status ( )
log . info ( ' STATUS {} ' . format ( response . status_code ) )
if response . status_code == 201 :
break
sleep ( completion_check_period )
if pbar is None :
pbar = self . _make_pbar ( " Downloading " )
self . _download_file ( url + ' &action=download ' , output_path = filename , pbar = pbar )
log . info ( f " Annotations have been exported to { filename } " )
def _make_tus_uploader ( cli , url , * * kwargs ) :
# Adjusts the library code for CVAT server
# allows to reuse session
class MyTusUploader ( client . Uploader ) :
def __init__ ( self , * _args , session : requests . Session = None , * * _kwargs ) :
self . _session = session
super ( ) . __init__ ( * _args , * * _kwargs )
def _do_request ( self ) :
self . request = TusRequest ( self )
self . request . handle = self . _session
try :
self . request . perform ( )
self . verify_upload ( )
except TusUploadFailed as error :
self . _retry_or_cry ( error )
@uploader._catch_requests_error
def create_url ( self ) :
"""
Return upload url .
Makes request to tus server to create a new upload url for the required file upload .
"""
headers = self . headers
headers [ ' upload-length ' ] = str ( self . file_size )
headers [ ' upload-metadata ' ] = ' , ' . join ( self . encode_metadata ( ) )
headers [ ' origin ' ] = cli . api . host # required by CVAT server
resp = self . _session . post ( self . client . url , headers = headers )
url = resp . headers . get ( " location " )
if url is None :
msg = ' Attempt to retrieve create file url with status {} ' . format ( resp . status_code )
raise uploader . TusCommunicationError ( msg , resp . status_code , resp . content )
return uploader . urljoin ( self . client . url , url )
@uploader._catch_requests_error
def get_offset ( self ) :
"""
Return offset from tus server .
This is different from the instance attribute ' offset ' because this makes an
http request to the tus server to retrieve the offset .
"""
resp = self . _session . head ( self . url , headers = self . headers )
offset = resp . headers . get ( ' upload-offset ' )
if offset is None :
msg = ' Attempt to retrieve offset fails with status {} ' . format ( resp . status_code )
raise uploader . TusCommunicationError ( msg , resp . status_code , resp . content )
return int ( offset )
tus_client = client . TusClient ( url , headers = cli . session . headers )
return MyTusUploader ( client = tus_client , session = cli . session , * * kwargs )
def _upload_file_data_with_tus ( self , url , filename , * , params = None , pbar = None , logger = None ) :
CHUNK_SIZE = 2 * * 20
file_size = os . stat ( filename ) . st_size
with open ( filename , ' rb ' ) as input_file :
if pbar is not None :
input_file = StreamWithProgress ( input_file , pbar , length = file_size )
response = self . session . get ( url + ' &action=download ' )
tus_uploader = self . _make_tus_uploader ( url + ' / ' , metadata = params ,
file_stream = input_file , chunk_size = CHUNK_SIZE , log_func = logger )
tus_uploader . upload ( )
def _upload_file_with_tus ( self , url , filename , * , params = None , pbar = None , logger = None ) :
# "CVAT-TUS" protocol has 2 extra messages
response = self . session . post ( url , headers = { ' Upload-Start ' : ' ' } , params = params )
response . raise_for_status ( )
if response . status_code != 202 :
raise Exception ( " Failed to upload file: "
f " unexpected status code received ( { response . status_code } ) " )
self . _upload_file_data_with_tus ( url = url , filename = filename ,
params = params , pbar = pbar , logger = logger )
with open ( filename , ' wb ' ) as fp :
fp . write ( response . content )
response = self . session . post ( url , headers = { ' Upload-Finish ' : ' ' } , params = params )
response . raise_for_status ( )
if response . status_code != 202 :
raise Exception ( " Failed to upload file: "
f " unexpected status code received ( { response . status_code } ) " )
def tasks_upload ( self , task_id , fileformat , filename , * * kwargs ) :
def tasks_upload ( self , task_id , fileformat , filename , * ,
completion_check_period = 2 , pbar = None , * * kwargs ) :
""" Upload annotations for a task in the specified format
( e . g . ' YOLO ZIP 1.0 ' ) . """
url = self . api . tasks_id_annotations_format ( task_id , fileformat )
url = self . api . tasks_id_annotations ( task_id )
params = {
' format ' : fileformat ,
' filename ' : os . path . basename ( filename )
}
if pbar is None :
pbar = self . _make_pbar ( " Uploading... " )
self . _upload_file_with_tus ( url , filename , params = params , pbar = pbar , logger = log . debug )
while True :
response = self . session . put (
url ,
files = { ' annotation_file ' : open ( filename , ' rb ' ) }
)
response = self . session . put ( url , params = params )
response . raise_for_status ( )
if response . status_code == 201 :
break
logger_string = " Upload job for Task ID {} " . format ( task_id ) + \
" with annotation file {} finished " . format ( filename )
log . info ( logger_string )
sleep ( completion_check_period )
def tasks_export ( self , task_id , filename , export_verification_period = 3 , * * kwargs ) :
""" Export and download a whole task """
export_url = self . api . tasks_id ( task_id ) + ' /backup '
log . info ( f " Upload job for Task ID { task_id } "
f " with annotation file { filename } finished " )
def tasks_export ( self , task_id , filename , * ,
completion_check_period = 2 , pbar = None , * * kwargs ) :
""" Download a task backup """
log . info ( " Waiting for the server to prepare the file... " )
url = self . api . tasks_id_backup ( task_id )
while True :
response = self . session . get ( export_url )
response = self . session . get ( url)
response . raise_for_status ( )
log . info ( ' STATUS {} ' . format ( response . status_code ) )
if response . status_code == 201 :
break
sleep ( export_verification_period )
sleep ( completion_check _period)
response = self . session . get ( export_url + ' ?action=download ' )
response . raise_for_status ( )
if pbar is None :
pbar = self . _make_pbar ( " Downloading " )
self . _download_file ( url + ' ?action=download ' , output_path = filename , pbar = pbar )
with open ( filename , ' wb ' ) as fp :
fp . write ( response . content )
logger_string = " Task {} has been exported sucessfully. " . format ( task_id ) + \
" to {} " . format ( os . path . abspath ( filename ) )
log . info ( logger_string )
log . info ( f " Task { task_id } has been exported sucessfully "
f " to { os . path . abspath ( filename ) } " )
def tasks_import ( self , filename , * ,
completion_check_period = 2 , pbar = None , * * kwargs ) - > None :
""" Import a task from a backup file """
url = self . api . tasks_backup ( )
if pbar is None :
pbar = self . _make_pbar ( " Uploading... " )
file_size = os . stat ( filename ) . st_size
def tasks_import ( self , filename , import_verification_period = 3 , * * kwargs ) :
""" Import a task """
url = self . api . tasks + ' /backup '
with open ( filename , ' rb ' ) as input_file :
input_stream = StreamWithProgress ( input_file , pbar = pbar , length = file_size )
response = self . session . post (
url ,
files = { ' task_file ' : input_file }
files = { ' task_file ' : input_ stream }
)
response . raise_for_status ( )
response_json = response . json ( )
rq_id = response_json [ ' rq_id ' ]
# check task status
while True :
sleep ( import_verification_period )
sleep ( completion_check _period)
response = self . session . post (
url ,
data = { ' rq_id ' : rq_id }
@ -268,8 +398,56 @@ class CLI():
if ' csrftoken ' in response . cookies :
self . session . headers [ ' X-CSRFToken ' ] = response . cookies [ ' csrftoken ' ]
def _make_pbar ( self , title : str = None ) - > tqdm . tqdm :
return tqdm . tqdm ( unit_scale = True , unit = ' B ' , unit_divisor = 1024 , desc = title )
class CVAT_API_V2 ( ) :
def _download_file ( self , url : str , output_path : str , * ,
timeout : int = 60 , pbar : Optional [ tqdm . tqdm ] = None ) - > None :
"""
Downloads the file from url into a temporary file , then renames it
to the requested name .
"""
CHUNK_SIZE = 2 * * 20
assert not osp . exists ( output_path )
tmp_path = output_path + " .tmp "
if osp . exists ( tmp_path ) :
raise FileExistsError ( f " Can ' t write temporary file ' { tmp_path } ' - file exists " )
response = self . session . get ( url , timeout = timeout , stream = True )
with closing ( response ) :
response . raise_for_status ( )
try :
file_size = int ( response . headers . get ( ' Content-Length ' , 0 ) )
except ( ValueError , KeyError ) :
file_size = None
try :
with open ( tmp_path , " wb " ) as fd :
if pbar is not None :
pbar . reset ( file_size )
try :
for chunk in response . iter_content ( chunk_size = CHUNK_SIZE ) :
if pbar is not None :
pbar . update ( n = len ( chunk ) )
fd . write ( chunk )
finally :
if pbar is not None :
pbar . close ( )
os . rename ( tmp_path , output_path )
except :
os . unlink ( tmp_path )
raise
class CVAT_API_V2 :
""" Build parameterized API URLs """
def __init__ ( self , host , https = False ) :
@ -279,7 +457,8 @@ class CVAT_API_V2():
host = host . replace ( ' http:// ' , ' ' )
host = host . replace ( ' https:// ' , ' ' )
scheme = ' https ' if https else ' http '
self . base = ' {} :// {} /api/ ' . format ( scheme , host )
self . host = ' {} :// {} ' . format ( scheme , host )
self . base = self . host + ' /api/ '
self . git = f ' { scheme } :// { host } /git/repository/ '
@property
@ -289,6 +468,9 @@ class CVAT_API_V2():
def tasks_page ( self , page_id ) :
return self . tasks + ' ?page= {} ' . format ( page_id )
def tasks_backup ( self ) :
return self . tasks + ' /backup '
def tasks_id ( self , task_id ) :
return self . tasks + ' / {} ' . format ( task_id )
@ -301,12 +483,18 @@ class CVAT_API_V2():
def tasks_id_status ( self , task_id ) :
return self . tasks_id ( task_id ) + ' /status '
def tasks_id_backup ( self , task_id ) :
return self . tasks_id ( task_id ) + ' /backup '
def tasks_id_annotations ( self , task_id ) :
return self . tasks_id ( task_id ) + ' /annotations '
def tasks_id_annotations_format ( self , task_id , fileformat ) :
return self . tasks_id ( task_id ) + ' /annotations?format= {} ' \
return self . tasks_id _annotations ( task_id ) + ' ?format={} ' \
. format ( fileformat )
def tasks_id_annotations_filename ( self , task_id , name , fileformat ) :
return self . tasks_id ( task_id ) + ' /annotations?format= {} &filename= {} ' \
return self . tasks_id _annotations ( task_id ) + ' ?format={} &filename= {} ' \
. format ( fileformat , name )
def git_create ( self , task_id ) :