You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
662 lines
25 KiB
Python
662 lines
25 KiB
Python
# Copyright (C) 2022 Intel Corporation
|
|
# Copyright (C) 2022 CVAT.ai Corporation
|
|
#
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
import json
|
|
from copy import deepcopy
|
|
from http import HTTPStatus
|
|
from time import sleep
|
|
|
|
import pytest
|
|
from cvat_sdk.api_client import apis, models
|
|
from cvat_sdk.core.helpers import get_paginated_collection
|
|
from deepdiff import DeepDiff
|
|
|
|
from shared.utils.config import get_method, make_api_client, patch_method
|
|
from shared.utils.helpers import generate_image_files
|
|
|
|
from .utils import export_dataset
|
|
|
|
|
|
def get_cloud_storage_content(username, cloud_storage_id, manifest):
|
|
with make_api_client(username) as api_client:
|
|
(data, _) = api_client.cloudstorages_api.retrieve_content(
|
|
cloud_storage_id, manifest_path=manifest
|
|
)
|
|
return data
|
|
|
|
|
|
@pytest.mark.usefixtures("restore_db_per_class")
|
|
class TestGetTasks:
|
|
def _test_task_list_200(self, user, project_id, data, exclude_paths="", **kwargs):
|
|
with make_api_client(user) as api_client:
|
|
results = get_paginated_collection(
|
|
api_client.projects_api.list_tasks_endpoint,
|
|
return_json=True,
|
|
id=project_id,
|
|
**kwargs,
|
|
)
|
|
assert DeepDiff(data, results, ignore_order=True, exclude_paths=exclude_paths) == {}
|
|
|
|
def _test_task_list_403(self, user, project_id, **kwargs):
|
|
with make_api_client(user) as api_client:
|
|
(_, response) = api_client.projects_api.list_tasks(
|
|
project_id, **kwargs, _parse_response=False, _check_status=False
|
|
)
|
|
assert response.status == HTTPStatus.FORBIDDEN
|
|
|
|
def _test_users_to_see_task_list(
|
|
self, project_id, tasks, users, is_staff, is_allow, is_project_staff, **kwargs
|
|
):
|
|
if is_staff:
|
|
users = [user for user in users if is_project_staff(user["id"], project_id)]
|
|
else:
|
|
users = [user for user in users if not is_project_staff(user["id"], project_id)]
|
|
assert len(users)
|
|
|
|
for user in users:
|
|
if is_allow:
|
|
self._test_task_list_200(user["username"], project_id, tasks, **kwargs)
|
|
else:
|
|
self._test_task_list_403(user["username"], project_id, **kwargs)
|
|
|
|
def _test_assigned_users_to_see_task_data(self, tasks, users, is_task_staff, **kwargs):
|
|
for task in tasks:
|
|
staff_users = [user for user in users if is_task_staff(user["id"], task["id"])]
|
|
assert len(staff_users)
|
|
|
|
for user in staff_users:
|
|
with make_api_client(user["username"]) as api_client:
|
|
(_, response) = api_client.tasks_api.list(**kwargs)
|
|
assert response.status == HTTPStatus.OK
|
|
response_data = json.loads(response.data)
|
|
|
|
assert any(_task["id"] == task["id"] for _task in response_data["results"])
|
|
|
|
@pytest.mark.parametrize("project_id", [1])
|
|
@pytest.mark.parametrize(
|
|
"groups, is_staff, is_allow",
|
|
[
|
|
("admin", False, True),
|
|
("business", False, False),
|
|
],
|
|
)
|
|
def test_project_tasks_visibility(
|
|
self, project_id, groups, users, tasks, is_staff, is_allow, find_users, is_project_staff
|
|
):
|
|
users = find_users(privilege=groups)
|
|
tasks = list(filter(lambda x: x["project_id"] == project_id, tasks))
|
|
assert len(tasks)
|
|
|
|
self._test_users_to_see_task_list(
|
|
project_id, tasks, users, is_staff, is_allow, is_project_staff
|
|
)
|
|
|
|
@pytest.mark.parametrize("project_id, groups", [(1, "user")])
|
|
def test_task_assigned_to_see_task(
|
|
self, project_id, groups, users, tasks, find_users, is_task_staff
|
|
):
|
|
users = find_users(privilege=groups)
|
|
tasks = list(filter(lambda x: x["project_id"] == project_id and x["assignee"], tasks))
|
|
assert len(tasks)
|
|
|
|
self._test_assigned_users_to_see_task_data(tasks, users, is_task_staff)
|
|
|
|
@pytest.mark.parametrize("org, project_id", [({"id": 2, "slug": "org2"}, 2)])
|
|
@pytest.mark.parametrize(
|
|
"role, is_staff, is_allow",
|
|
[
|
|
("maintainer", False, True),
|
|
("supervisor", False, False),
|
|
],
|
|
)
|
|
def test_org_project_tasks_visibility(
|
|
self,
|
|
org,
|
|
project_id,
|
|
role,
|
|
is_staff,
|
|
is_allow,
|
|
tasks,
|
|
is_task_staff,
|
|
is_project_staff,
|
|
find_users,
|
|
):
|
|
users = find_users(org=org["id"], role=role)
|
|
tasks = list(filter(lambda x: x["project_id"] == project_id, tasks))
|
|
assert len(tasks)
|
|
|
|
self._test_users_to_see_task_list(
|
|
project_id, tasks, users, is_staff, is_allow, is_project_staff, org=org["slug"]
|
|
)
|
|
|
|
@pytest.mark.parametrize("org, project_id, role", [({"id": 2, "slug": "org2"}, 2, "worker")])
|
|
def test_org_task_assigneed_to_see_task(
|
|
self, org, project_id, role, users, tasks, find_users, is_task_staff
|
|
):
|
|
users = find_users(org=org["id"], role=role)
|
|
tasks = list(filter(lambda x: x["project_id"] == project_id and x["assignee"], tasks))
|
|
assert len(tasks)
|
|
|
|
self._test_assigned_users_to_see_task_data(tasks, users, is_task_staff, org=org["slug"])
|
|
|
|
|
|
@pytest.mark.usefixtures("restore_db_per_function")
|
|
class TestPostTasks:
|
|
def _test_create_task_201(self, user, spec, **kwargs):
|
|
with make_api_client(user) as api_client:
|
|
(_, response) = api_client.tasks_api.create(spec, **kwargs)
|
|
assert response.status == HTTPStatus.CREATED
|
|
|
|
def _test_create_task_403(self, user, spec, **kwargs):
|
|
with make_api_client(user) as api_client:
|
|
(_, response) = api_client.tasks_api.create(
|
|
spec, **kwargs, _parse_response=False, _check_status=False
|
|
)
|
|
assert response.status == HTTPStatus.FORBIDDEN
|
|
|
|
def _test_users_to_create_task_in_project(
|
|
self, project_id, users, is_staff, is_allow, is_project_staff, **kwargs
|
|
):
|
|
if is_staff:
|
|
users = [user for user in users if is_project_staff(user["id"], project_id)]
|
|
else:
|
|
users = [user for user in users if not is_project_staff(user["id"], project_id)]
|
|
assert len(users)
|
|
|
|
for user in users:
|
|
username = user["username"]
|
|
spec = {
|
|
"name": f"test {username} to create a task within a project",
|
|
"project_id": project_id,
|
|
}
|
|
|
|
if is_allow:
|
|
self._test_create_task_201(username, spec, **kwargs)
|
|
else:
|
|
self._test_create_task_403(username, spec, **kwargs)
|
|
|
|
@pytest.mark.parametrize("project_id", [1])
|
|
@pytest.mark.parametrize(
|
|
"groups, is_staff, is_allow",
|
|
[
|
|
("admin", False, True),
|
|
("business", False, False),
|
|
("user", True, True),
|
|
],
|
|
)
|
|
def test_users_to_create_task_in_project(
|
|
self, project_id, groups, is_staff, is_allow, is_project_staff, find_users
|
|
):
|
|
users = find_users(privilege=groups)
|
|
self._test_users_to_create_task_in_project(
|
|
project_id, users, is_staff, is_allow, is_project_staff
|
|
)
|
|
|
|
@pytest.mark.parametrize("org, project_id", [({"id": 2, "slug": "org2"}, 2)])
|
|
@pytest.mark.parametrize(
|
|
"role, is_staff, is_allow",
|
|
[
|
|
("worker", False, False),
|
|
],
|
|
)
|
|
def test_worker_cannot_create_task_in_project_without_ownership(
|
|
self, org, project_id, role, is_staff, is_allow, is_project_staff, find_users
|
|
):
|
|
users = find_users(org=org["id"], role=role)
|
|
self._test_users_to_create_task_in_project(
|
|
project_id, users, is_staff, is_allow, is_project_staff, org=org["slug"]
|
|
)
|
|
|
|
def test_can_create_task_with_skeleton(self):
|
|
username = "admin1"
|
|
|
|
spec = {
|
|
"name": f"test admin1 to create a task with skeleton",
|
|
"labels": [
|
|
{
|
|
"name": "s1",
|
|
"color": "#5c5eba",
|
|
"attributes": [
|
|
{
|
|
"name": "color",
|
|
"mutable": False,
|
|
"input_type": "select",
|
|
"default_value": "white",
|
|
"values": ["white", "black"],
|
|
}
|
|
],
|
|
"type": "skeleton",
|
|
"sublabels": [
|
|
{
|
|
"name": "1",
|
|
"color": "#d53957",
|
|
"attributes": [
|
|
{
|
|
"id": 23,
|
|
"name": "attr",
|
|
"mutable": False,
|
|
"input_type": "select",
|
|
"default_value": "val1",
|
|
"values": ["val1", "val2"],
|
|
}
|
|
],
|
|
"type": "points",
|
|
},
|
|
{"name": "2", "color": "#4925ec", "attributes": [], "type": "points"},
|
|
{"name": "3", "color": "#59a8fe", "attributes": [], "type": "points"},
|
|
],
|
|
"svg": '<line x1="36.329429626464844" y1="45.98662185668945" x2="59.07190704345703" y2="23.076923370361328" '
|
|
'stroke="black" data-type="edge" data-node-from="2" stroke-width="0.5" data-node-to="3"></line>'
|
|
'<line x1="22.61705780029297" y1="25.75250816345215" x2="36.329429626464844" y2="45.98662185668945" '
|
|
'stroke="black" data-type="edge" data-node-from="1" stroke-width="0.5" data-node-to="2"></line>'
|
|
'<circle r="1.5" stroke="black" fill="#b3b3b3" cx="22.61705780029297" cy="25.75250816345215" '
|
|
'stroke-width="0.1" data-type="element node" data-element-id="1" data-node-id="1" data-label-name="1">'
|
|
'</circle><circle r="1.5" stroke="black" fill="#b3b3b3" cx="36.329429626464844" cy="45.98662185668945" '
|
|
'stroke-width="0.1" data-type="element node" data-element-id="2" data-node-id="2" data-label-name="2"></circle>'
|
|
'<circle r="1.5" stroke="black" fill="#b3b3b3" cx="59.07190704345703" cy="23.076923370361328" '
|
|
'stroke-width="0.1" data-type="element node" data-element-id="3" data-node-id="3" data-label-name="3"></circle>',
|
|
}
|
|
],
|
|
}
|
|
|
|
self._test_create_task_201(username, spec)
|
|
|
|
|
|
@pytest.mark.usefixtures("restore_db_per_class")
|
|
class TestGetData:
|
|
_USERNAME = "user1"
|
|
|
|
@pytest.mark.parametrize(
|
|
"content_type, task_id",
|
|
[
|
|
("image/png", 8),
|
|
("image/png", 5),
|
|
("image/x.point-cloud-data", 6),
|
|
],
|
|
)
|
|
def test_frame_content_type(self, content_type, task_id):
|
|
with make_api_client(self._USERNAME) as api_client:
|
|
(_, response) = api_client.tasks_api.retrieve_data(
|
|
task_id, type="frame", quality="original", number=0
|
|
)
|
|
assert response.status == HTTPStatus.OK
|
|
assert response.headers["Content-Type"] == content_type
|
|
|
|
|
|
@pytest.mark.usefixtures("restore_db_per_function")
|
|
class TestPatchTaskAnnotations:
|
|
def _test_check_response(self, is_allow, response, data=None):
|
|
if is_allow:
|
|
assert response.status == HTTPStatus.OK
|
|
assert DeepDiff(data, json.loads(response.data), exclude_paths="root['version']") == {}
|
|
else:
|
|
assert response.status == HTTPStatus.FORBIDDEN
|
|
|
|
@pytest.fixture(scope="class")
|
|
def request_data(self, annotations):
|
|
def get_data(tid):
|
|
data = deepcopy(annotations["task"][str(tid)])
|
|
data["shapes"][0].update({"points": [2.0, 3.0, 4.0, 5.0, 6.0, 7.0]})
|
|
data["version"] += 1
|
|
return data
|
|
|
|
return get_data
|
|
|
|
@pytest.mark.parametrize("org", [""])
|
|
@pytest.mark.parametrize(
|
|
"privilege, task_staff, is_allow",
|
|
[
|
|
("admin", True, True),
|
|
("admin", False, True),
|
|
("business", True, True),
|
|
("business", False, False),
|
|
("worker", True, True),
|
|
("worker", False, False),
|
|
("user", True, True),
|
|
("user", False, False),
|
|
],
|
|
)
|
|
def test_user_update_task_annotations(
|
|
self,
|
|
org,
|
|
privilege,
|
|
task_staff,
|
|
is_allow,
|
|
find_task_staff_user,
|
|
find_users,
|
|
request_data,
|
|
tasks_by_org,
|
|
filter_tasks_with_shapes,
|
|
):
|
|
users = find_users(privilege=privilege)
|
|
tasks = tasks_by_org[org]
|
|
filtered_tasks = filter_tasks_with_shapes(tasks)
|
|
username, tid = find_task_staff_user(filtered_tasks, users, task_staff)
|
|
|
|
data = request_data(tid)
|
|
with make_api_client(username) as api_client:
|
|
(_, response) = api_client.tasks_api.partial_update_annotations(
|
|
id=tid,
|
|
action="update",
|
|
org=org,
|
|
patched_labeled_data_request=deepcopy(data),
|
|
_parse_response=False,
|
|
_check_status=False,
|
|
)
|
|
|
|
self._test_check_response(is_allow, response, data)
|
|
|
|
@pytest.mark.parametrize("org", [2])
|
|
@pytest.mark.parametrize(
|
|
"role, task_staff, is_allow",
|
|
[
|
|
("maintainer", False, True),
|
|
("owner", False, True),
|
|
("supervisor", False, False),
|
|
("worker", False, False),
|
|
("maintainer", True, True),
|
|
("owner", True, True),
|
|
("supervisor", True, True),
|
|
("worker", True, True),
|
|
],
|
|
)
|
|
def test_member_update_task_annotation(
|
|
self,
|
|
org,
|
|
role,
|
|
task_staff,
|
|
is_allow,
|
|
find_task_staff_user,
|
|
find_users,
|
|
tasks_by_org,
|
|
request_data,
|
|
):
|
|
users = find_users(role=role, org=org)
|
|
tasks = tasks_by_org[org]
|
|
username, tid = find_task_staff_user(tasks, users, task_staff, [12, 14])
|
|
|
|
data = request_data(tid)
|
|
with make_api_client(username) as api_client:
|
|
(_, response) = api_client.tasks_api.partial_update_annotations(
|
|
id=tid,
|
|
org_id=org,
|
|
action="update",
|
|
patched_labeled_data_request=deepcopy(data),
|
|
_parse_response=False,
|
|
_check_status=False,
|
|
)
|
|
|
|
self._test_check_response(is_allow, response, data)
|
|
|
|
|
|
@pytest.mark.usefixtures("restore_db_per_class")
|
|
class TestGetTaskDataset:
|
|
def _test_export_task(self, username, tid, **kwargs):
|
|
with make_api_client(username) as api_client:
|
|
return export_dataset(api_client.tasks_api.retrieve_dataset_endpoint, id=tid, **kwargs)
|
|
|
|
def test_can_export_task_dataset(self, admin_user, tasks_with_shapes):
|
|
task = tasks_with_shapes[0]
|
|
response = self._test_export_task(admin_user, task["id"], format="CVAT for images 1.1")
|
|
assert response.data
|
|
|
|
|
|
@pytest.mark.usefixtures("restore_db_per_function")
|
|
@pytest.mark.usefixtures("restore_cvat_data")
|
|
class TestPostTaskData:
|
|
_USERNAME = "admin1"
|
|
|
|
@staticmethod
|
|
def _wait_until_task_is_created(api: apis.TasksApi, task_id: int) -> models.RqStatus:
|
|
for _ in range(100):
|
|
(status, _) = api.retrieve_status(task_id)
|
|
if status.state.value in ["Finished", "Failed"]:
|
|
return status
|
|
sleep(1)
|
|
raise Exception("Cannot create task")
|
|
|
|
def _test_create_task(self, username, spec, data, content_type, **kwargs):
|
|
with make_api_client(username) as api_client:
|
|
(task, response) = api_client.tasks_api.create(spec, **kwargs)
|
|
assert response.status == HTTPStatus.CREATED
|
|
|
|
(_, response) = api_client.tasks_api.create_data(
|
|
task.id, data_request=deepcopy(data), _content_type=content_type, **kwargs
|
|
)
|
|
assert response.status == HTTPStatus.ACCEPTED
|
|
|
|
status = self._wait_until_task_is_created(api_client.tasks_api, task.id)
|
|
assert status.state.value == "Finished"
|
|
|
|
return task.id
|
|
|
|
def test_can_create_task_with_defined_start_and_stop_frames(self):
|
|
task_spec = {
|
|
"name": f"test {self._USERNAME} to create a task with defined start and stop frames",
|
|
"labels": [
|
|
{
|
|
"name": "car",
|
|
"color": "#ff00ff",
|
|
"attributes": [
|
|
{
|
|
"name": "a",
|
|
"mutable": True,
|
|
"input_type": "number",
|
|
"default_value": "5",
|
|
"values": ["4", "5", "6"],
|
|
}
|
|
],
|
|
}
|
|
],
|
|
}
|
|
|
|
task_data = {
|
|
"image_quality": 75,
|
|
"start_frame": 2,
|
|
"stop_frame": 5,
|
|
"client_files": generate_image_files(7),
|
|
}
|
|
|
|
task_id = self._test_create_task(
|
|
self._USERNAME, task_spec, task_data, content_type="multipart/form-data"
|
|
)
|
|
|
|
# check task size
|
|
with make_api_client(self._USERNAME) as api_client:
|
|
(task, _) = api_client.tasks_api.retrieve(task_id)
|
|
assert task.size == 4
|
|
|
|
def test_can_get_annotations_from_new_task_with_skeletons(self):
|
|
spec = {
|
|
"name": f"test admin1 to create a task with skeleton",
|
|
"labels": [
|
|
{
|
|
"name": "s1",
|
|
"color": "#5c5eba",
|
|
"attributes": [],
|
|
"type": "skeleton",
|
|
"sublabels": [
|
|
{"name": "1", "color": "#d12345", "attributes": [], "type": "points"},
|
|
{"name": "2", "color": "#350dea", "attributes": [], "type": "points"},
|
|
],
|
|
"svg": '<line x1="19.464284896850586" y1="21.922269821166992" x2="54.08613586425781" y2="43.60293960571289" '
|
|
'stroke="black" data-type="edge" data-node-from="1" stroke-width="0.5" data-node-to="2"></line>'
|
|
'<circle r="1.5" stroke="black" fill="#b3b3b3" cx="19.464284896850586" cy="21.922269821166992" '
|
|
'stroke-width="0.1" data-type="element node" data-element-id="1" data-node-id="1" data-label-id="103"></circle>'
|
|
'<circle r="1.5" stroke="black" fill="#b3b3b3" cx="54.08613586425781" cy="43.60293960571289" '
|
|
'stroke-width="0.1" data-type="element node" data-element-id="2" data-node-id="2" data-label-id="104"></circle>',
|
|
}
|
|
],
|
|
}
|
|
|
|
task_data = {
|
|
"image_quality": 75,
|
|
"client_files": generate_image_files(3),
|
|
}
|
|
|
|
task_id = self._test_create_task(
|
|
self._USERNAME, spec, task_data, content_type="multipart/form-data"
|
|
)
|
|
|
|
response = get_method(self._USERNAME, f"tasks/{task_id}")
|
|
label_ids = {}
|
|
for label in response.json()["labels"]:
|
|
label_ids.setdefault(label["type"], []).append(label["id"])
|
|
|
|
job_id = response.json()["segments"][0]["jobs"][0]["id"]
|
|
patch_data = {
|
|
"shapes": [
|
|
{
|
|
"type": "skeleton",
|
|
"occluded": False,
|
|
"outside": False,
|
|
"z_order": 0,
|
|
"rotation": 0,
|
|
"points": [],
|
|
"frame": 0,
|
|
"label_id": label_ids["skeleton"][0],
|
|
"group": 0,
|
|
"source": "manual",
|
|
"attributes": [],
|
|
"elements": [
|
|
{
|
|
"type": "points",
|
|
"occluded": False,
|
|
"outside": False,
|
|
"z_order": 0,
|
|
"rotation": 0,
|
|
"points": [131.63947368421032, 165.0868421052637],
|
|
"frame": 0,
|
|
"label_id": label_ids["points"][0],
|
|
"group": 0,
|
|
"source": "manual",
|
|
"attributes": [],
|
|
},
|
|
{
|
|
"type": "points",
|
|
"occluded": False,
|
|
"outside": False,
|
|
"z_order": 0,
|
|
"rotation": 0,
|
|
"points": [354.98157894736823, 304.2710526315795],
|
|
"frame": 0,
|
|
"label_id": label_ids["points"][1],
|
|
"group": 0,
|
|
"source": "manual",
|
|
"attributes": [],
|
|
},
|
|
],
|
|
}
|
|
],
|
|
"tracks": [
|
|
{
|
|
"frame": 0,
|
|
"label_id": label_ids["skeleton"][0],
|
|
"group": 0,
|
|
"source": "manual",
|
|
"shapes": [
|
|
{
|
|
"type": "skeleton",
|
|
"occluded": False,
|
|
"outside": False,
|
|
"z_order": 0,
|
|
"rotation": 0,
|
|
"points": [],
|
|
"frame": 0,
|
|
"attributes": [],
|
|
}
|
|
],
|
|
"attributes": [],
|
|
"elements": [
|
|
{
|
|
"frame": 0,
|
|
"label_id": label_ids["points"][0],
|
|
"group": 0,
|
|
"source": "manual",
|
|
"shapes": [
|
|
{
|
|
"type": "points",
|
|
"occluded": False,
|
|
"outside": False,
|
|
"z_order": 0,
|
|
"rotation": 0,
|
|
"points": [295.6394736842103, 472.5868421052637],
|
|
"frame": 0,
|
|
"attributes": [],
|
|
}
|
|
],
|
|
"attributes": [],
|
|
},
|
|
{
|
|
"frame": 0,
|
|
"label_id": label_ids["points"][1],
|
|
"group": 0,
|
|
"source": "manual",
|
|
"shapes": [
|
|
{
|
|
"type": "points",
|
|
"occluded": False,
|
|
"outside": False,
|
|
"z_order": 0,
|
|
"rotation": 0,
|
|
"points": [619.3236842105262, 846.9815789473689],
|
|
"frame": 0,
|
|
"attributes": [],
|
|
}
|
|
],
|
|
"attributes": [],
|
|
},
|
|
],
|
|
}
|
|
],
|
|
"tags": [],
|
|
"version": 0,
|
|
}
|
|
|
|
response = patch_method(
|
|
self._USERNAME, f"jobs/{job_id}/annotations", patch_data, action="create"
|
|
)
|
|
response = get_method(self._USERNAME, f"jobs/{job_id}/annotations")
|
|
assert response.status_code == HTTPStatus.OK
|
|
|
|
@pytest.mark.parametrize(
|
|
"cloud_storage_id, manifest, use_bucket_content, org",
|
|
[
|
|
(1, "manifest.jsonl", False, ""), # public bucket
|
|
(2, "sub/manifest.jsonl", True, "org2"), # private bucket
|
|
],
|
|
)
|
|
def test_create_task_with_cloud_storage_files(
|
|
self, cloud_storage_id, manifest, use_bucket_content, org
|
|
):
|
|
if use_bucket_content:
|
|
cloud_storage_content = get_cloud_storage_content(
|
|
self._USERNAME, cloud_storage_id, manifest
|
|
)
|
|
else:
|
|
cloud_storage_content = ["image_case_65_1.png", "image_case_65_2.png"]
|
|
cloud_storage_content.append(manifest)
|
|
|
|
task_spec = {
|
|
"name": f"Task with files from cloud storage {cloud_storage_id}",
|
|
"labels": [
|
|
{
|
|
"name": "car",
|
|
}
|
|
],
|
|
}
|
|
|
|
data_spec = {
|
|
"image_quality": 75,
|
|
"use_cache": True,
|
|
"storage": "cloud_storage",
|
|
"cloud_storage_id": cloud_storage_id,
|
|
"server_files": cloud_storage_content,
|
|
}
|
|
|
|
self._test_create_task(
|
|
self._USERNAME, task_spec, data_spec, content_type="application/json", org=org
|
|
)
|