@ -2767,6 +2767,13 @@ class TaskDataAPITestCase(APITestCase):
shutil . rmtree ( root_path )
cls . _image_sizes [ filename ] = image_sizes
file_name = ' test_1.pdf '
path = os . path . join ( settings . SHARE_ROOT , file_name )
img_sizes , data = generate_pdf_file ( file_name , page_count = 5 )
with open ( path , " wb " ) as pdf_file :
pdf_file . write ( data . read ( ) )
cls . _image_sizes [ file_name ] = img_sizes
generate_manifest_file ( data_type = ' video ' , manifest_path = os . path . join ( settings . SHARE_ROOT , ' videos ' , ' manifest.jsonl ' ) ,
sources = [ os . path . join ( settings . SHARE_ROOT , ' videos ' , ' test_video_1.mp4 ' ) ] )
@ -2804,6 +2811,9 @@ class TaskDataAPITestCase(APITestCase):
path = os . path . join ( settings . SHARE_ROOT , " manifest.jsonl " )
os . remove ( path )
path = os . path . join ( settings . SHARE_ROOT , " test_1.pdf " )
os . remove ( path )
def _run_api_v1_tasks_id_data_post ( self , tid , user , data ) :
with ForceLogin ( user , self . client ) :
response = self . client . post ( ' /api/v1/tasks/ {} /data ' . format ( tid ) ,
@ -2886,10 +2896,12 @@ class TaskDataAPITestCase(APITestCase):
db_data = Task . objects . get ( pk = task_id ) . data
self . assertEqual ( expected_storage_method , db_data . storage_method )
self . assertEqual ( expected_uploaded_data_location , db_data . storage )
# check if used share without copying inside and files doesn`t exist in ../raw/
# check if used share without copying inside and files doesn`t exist in ../raw/ and exist in share
if expected_uploaded_data_location is StorageChoice . SHARE :
self . assertEqual ( False ,
os . path . exists ( os . path . join ( db_data . get_upload_dirname ( ) , next ( iter ( data . values ( ) ) ) ) ) )
raw_file_path = os . path . join ( db_data . get_upload_dirname ( ) , next ( iter ( data . values ( ) ) ) )
share_file_path = os . path . join ( settings . SHARE_ROOT , next ( iter ( data . values ( ) ) ) )
self . assertEqual ( False , os . path . exists ( raw_file_path ) )
self . assertEqual ( True , os . path . exists ( share_file_path ) )
# check preview
response = self . _get_preview ( task_id , user )
@ -2956,6 +2968,10 @@ class TaskDataAPITestCase(APITestCase):
for f in source_files :
if zipfile . is_zipfile ( f ) :
source_images . extend ( self . _extract_zip_chunk ( f , dimension = dimension ) )
elif isinstance ( f , str ) and f . endswith ( ' .pdf ' ) :
with open ( f , ' rb ' ) as pdf_file :
source_images . extend ( convert_from_bytes ( pdf_file . read ( ) ,
fmt = ' png ' ) )
elif isinstance ( f , io . BytesIO ) and \
str ( getattr ( f , ' name ' , None ) ) . endswith ( ' .pdf ' ) :
source_images . extend ( convert_from_bytes ( f . getvalue ( ) ,
@ -3475,6 +3491,18 @@ class TaskDataAPITestCase(APITestCase):
self . _test_api_v1_tasks_id_data_spec ( user , task_spec , task_data , self . ChunkType . IMAGESET , self . ChunkType . IMAGESET ,
image_sizes , StorageMethodChoice . CACHE , StorageChoice . SHARE )
task_spec . update ( [ ( ' name ' , ' task pdf in the shared folder #30 ' ) ] )
task_data = {
" server_files[0] " : " test_1.pdf " ,
" image_quality " : 70 ,
" copy_data " : False ,
" use_cache " : True ,
}
image_sizes = self . _image_sizes [ task_data [ " server_files[0] " ] ]
self . _test_api_v1_tasks_id_data_spec ( user , task_spec , task_data , self . ChunkType . IMAGESET , self . ChunkType . IMAGESET ,
image_sizes , StorageMethodChoice . CACHE , StorageChoice . LOCAL )
def test_api_v1_tasks_id_data_admin ( self ) :
self . _test_api_v1_tasks_id_data ( self . admin )