@ -425,7 +425,8 @@ class TestPostTaskData:
sleep ( 1 )
raise Exception ( " Cannot create task " )
def _test_create_task ( self , username , spec , data , content_type , * * kwargs ) :
@staticmethod
def _test_create_task ( username , spec , data , content_type , * * kwargs ) :
with make_api_client ( username ) as api_client :
( task , response ) = api_client . tasks_api . create ( spec , * * kwargs )
assert response . status == HTTPStatus . CREATED
@ -435,7 +436,7 @@ class TestPostTaskData:
)
assert response . status == HTTPStatus . ACCEPTED
status = self . _wait_until_task_is_created ( api_client . tasks_api , task . id )
status = TestPostTaskData . _wait_until_task_is_created ( api_client . tasks_api , task . id )
assert status . state . value == " Finished "
return task . id
@ -798,16 +799,22 @@ class TestPostTaskData:
status = self . _test_cannot_create_task ( self . _USERNAME , task_spec , data_spec )
assert " No media data found " in status . message
@pytest.mark.usefixtures ( " restore_db_per_function " )
@pytest.mark.usefixtures ( " restore_cvat_data " )
class TestWorkWithTask :
_USERNAME = " admin1 "
@pytest.mark.with_external_services
@pytest.mark.parametrize (
" cloud_storage_id, manifest, org " ,
[ ( 1 , " manifest.jsonl " , " " ) ] , # public bucket
)
def test_cannot_create_task_with_mythical_cloud_storage_data (
self , cloud_storage_id , manifest , org
def test_ work_with_task_containing_non_stable_cloud_storage_files (
self , cloud_storage_id , manifest , org , cloud_storages , request
) :
mythical_file = " mythical.jp g"
cloud_storage_content = [ mythical_fil e, manifest ]
image_name = " image_case_65_1.pn g"
cloud_storage_content = [ image_nam e, manifest ]
task_spec = {
" name " : f " Task with mythical file from cloud storage { cloud_storage_id } " ,
@ -821,8 +828,32 @@ class TestPostTaskData:
" server_files " : cloud_storage_content ,
}
status = self . _test_cannot_create_task ( self . _USERNAME , task_spec , data_spec , org = org )
assert mythical_file in status . message
task_id = TestPostTaskData . _test_create_task (
self . _USERNAME , task_spec , data_spec , content_type = " application/json " , org = org
)
# save image from the "public" bucket and remove it temporary
s3_client = s3 . make_client ( )
bucket_name = cloud_storages [ cloud_storage_id ] [ " resource " ]
image = s3_client . download_fileobj ( bucket_name , image_name )
s3_client . remove_file ( bucket_name , image_name )
request . addfinalizer (
partial ( s3_client . create_file , bucket = bucket_name , filename = image_name , data = image )
)
with make_api_client ( self . _USERNAME ) as api_client :
try :
api_client . tasks_api . retrieve_data (
task_id , number = 0 , quality = " original " , type = " frame "
)
raise AssertionError ( " Frame should not exist " )
except AssertionError :
raise
except Exception as ex :
assert ex . status == HTTPStatus . NOT_FOUND
assert image_name in ex . body
@pytest.mark.usefixtures ( " restore_db_per_class " )