# Copyright (C) 2022 Intel Corporation # Copyright (C) 2022 CVAT.ai Corporation # # SPDX-License-Identifier: MIT import json from copy import deepcopy from http import HTTPStatus from time import sleep import pytest from cvat_sdk.api_client import apis, models from cvat_sdk.core.helpers import get_paginated_collection from deepdiff import DeepDiff from shared.utils.config import get_method, make_api_client, patch_method from shared.utils.helpers import generate_image_files from .utils import export_dataset def get_cloud_storage_content(username, cloud_storage_id, manifest): with make_api_client(username) as api_client: (data, _) = api_client.cloudstorages_api.retrieve_content( cloud_storage_id, manifest_path=manifest ) return data @pytest.mark.usefixtures("dontchangedb") class TestGetTasks: def _test_task_list_200(self, user, project_id, data, exclude_paths="", **kwargs): with make_api_client(user) as api_client: results = get_paginated_collection( api_client.projects_api.list_tasks_endpoint, return_json=True, id=project_id, **kwargs, ) assert DeepDiff(data, results, ignore_order=True, exclude_paths=exclude_paths) == {} def _test_task_list_403(self, user, project_id, **kwargs): with make_api_client(user) as api_client: (_, response) = api_client.projects_api.list_tasks( project_id, **kwargs, _parse_response=False, _check_status=False ) assert response.status == HTTPStatus.FORBIDDEN def _test_users_to_see_task_list( self, project_id, tasks, users, is_staff, is_allow, is_project_staff, **kwargs ): if is_staff: users = [user for user in users if is_project_staff(user["id"], project_id)] else: users = [user for user in users if not is_project_staff(user["id"], project_id)] assert len(users) for user in users: if is_allow: self._test_task_list_200(user["username"], project_id, tasks, **kwargs) else: self._test_task_list_403(user["username"], project_id, **kwargs) def _test_assigned_users_to_see_task_data(self, tasks, users, is_task_staff, **kwargs): for task in tasks: staff_users = [user for user in users if is_task_staff(user["id"], task["id"])] assert len(staff_users) for user in staff_users: with make_api_client(user["username"]) as api_client: (_, response) = api_client.tasks_api.list(**kwargs) assert response.status == HTTPStatus.OK response_data = json.loads(response.data) assert any(_task["id"] == task["id"] for _task in response_data["results"]) @pytest.mark.parametrize("project_id", [1]) @pytest.mark.parametrize( "groups, is_staff, is_allow", [ ("admin", False, True), ("business", False, False), ], ) def test_project_tasks_visibility( self, project_id, groups, users, tasks, is_staff, is_allow, find_users, is_project_staff ): users = find_users(privilege=groups) tasks = list(filter(lambda x: x["project_id"] == project_id, tasks)) assert len(tasks) self._test_users_to_see_task_list( project_id, tasks, users, is_staff, is_allow, is_project_staff ) @pytest.mark.parametrize("project_id, groups", [(1, "user")]) def test_task_assigned_to_see_task( self, project_id, groups, users, tasks, find_users, is_task_staff ): users = find_users(privilege=groups) tasks = list(filter(lambda x: x["project_id"] == project_id and x["assignee"], tasks)) assert len(tasks) self._test_assigned_users_to_see_task_data(tasks, users, is_task_staff) @pytest.mark.parametrize("org, project_id", [({"id": 2, "slug": "org2"}, 2)]) @pytest.mark.parametrize( "role, is_staff, is_allow", [ ("maintainer", False, True), ("supervisor", False, False), ], ) def test_org_project_tasks_visibility( self, org, project_id, role, is_staff, is_allow, tasks, is_task_staff, is_project_staff, find_users, ): users = find_users(org=org["id"], role=role) tasks = list(filter(lambda x: x["project_id"] == project_id, tasks)) assert len(tasks) self._test_users_to_see_task_list( project_id, tasks, users, is_staff, is_allow, is_project_staff, org=org["slug"] ) @pytest.mark.parametrize("org, project_id, role", [({"id": 2, "slug": "org2"}, 2, "worker")]) def test_org_task_assigneed_to_see_task( self, org, project_id, role, users, tasks, find_users, is_task_staff ): users = find_users(org=org["id"], role=role) tasks = list(filter(lambda x: x["project_id"] == project_id and x["assignee"], tasks)) assert len(tasks) self._test_assigned_users_to_see_task_data(tasks, users, is_task_staff, org=org["slug"]) @pytest.mark.usefixtures("changedb") class TestPostTasks: def _test_create_task_201(self, user, spec, **kwargs): with make_api_client(user) as api_client: (_, response) = api_client.tasks_api.create(spec, **kwargs) assert response.status == HTTPStatus.CREATED def _test_create_task_403(self, user, spec, **kwargs): with make_api_client(user) as api_client: (_, response) = api_client.tasks_api.create( spec, **kwargs, _parse_response=False, _check_status=False ) assert response.status == HTTPStatus.FORBIDDEN def _test_users_to_create_task_in_project( self, project_id, users, is_staff, is_allow, is_project_staff, **kwargs ): if is_staff: users = [user for user in users if is_project_staff(user["id"], project_id)] else: users = [user for user in users if not is_project_staff(user["id"], project_id)] assert len(users) for user in users: username = user["username"] spec = { "name": f"test {username} to create a task within a project", "project_id": project_id, } if is_allow: self._test_create_task_201(username, spec, **kwargs) else: self._test_create_task_403(username, spec, **kwargs) @pytest.mark.parametrize("project_id", [1]) @pytest.mark.parametrize( "groups, is_staff, is_allow", [ ("admin", False, True), ("business", False, False), ("user", True, True), ], ) def test_users_to_create_task_in_project( self, project_id, groups, is_staff, is_allow, is_project_staff, find_users ): users = find_users(privilege=groups) self._test_users_to_create_task_in_project( project_id, users, is_staff, is_allow, is_project_staff ) @pytest.mark.parametrize("org, project_id", [({"id": 2, "slug": "org2"}, 2)]) @pytest.mark.parametrize( "role, is_staff, is_allow", [ ("worker", False, False), ], ) def test_worker_cannot_create_task_in_project_without_ownership( self, org, project_id, role, is_staff, is_allow, is_project_staff, find_users ): users = find_users(org=org["id"], role=role) self._test_users_to_create_task_in_project( project_id, users, is_staff, is_allow, is_project_staff, org=org["slug"] ) def test_can_create_task_with_skeleton(self): username = "admin1" spec = { "name": f"test admin1 to create a task with skeleton", "labels": [ { "name": "s1", "color": "#5c5eba", "attributes": [ { "name": "color", "mutable": False, "input_type": "select", "default_value": "white", "values": ["white", "black"], } ], "type": "skeleton", "sublabels": [ { "name": "1", "color": "#d53957", "attributes": [ { "id": 23, "name": "attr", "mutable": False, "input_type": "select", "default_value": "val1", "values": ["val1", "val2"], } ], "type": "points", }, {"name": "2", "color": "#4925ec", "attributes": [], "type": "points"}, {"name": "3", "color": "#59a8fe", "attributes": [], "type": "points"}, ], "svg": '' '' '' '' '', } ], } self._test_create_task_201(username, spec) @pytest.mark.usefixtures("dontchangedb") class TestGetData: _USERNAME = "user1" @pytest.mark.parametrize( "content_type, task_id", [ ("image/png", 8), ("image/png", 5), ("image/x.point-cloud-data", 6), ], ) def test_frame_content_type(self, content_type, task_id): with make_api_client(self._USERNAME) as api_client: (_, response) = api_client.tasks_api.retrieve_data( task_id, type="frame", quality="original", number=0 ) assert response.status == HTTPStatus.OK assert response.headers["Content-Type"] == content_type @pytest.mark.usefixtures("changedb") class TestPatchTaskAnnotations: def _test_check_response(self, is_allow, response, data=None): if is_allow: assert response.status == HTTPStatus.OK assert DeepDiff(data, json.loads(response.data), exclude_paths="root['version']") == {} else: assert response.status == HTTPStatus.FORBIDDEN @pytest.fixture(scope="class") def request_data(self, annotations): def get_data(tid): data = deepcopy(annotations["task"][str(tid)]) data["shapes"][0].update({"points": [2.0, 3.0, 4.0, 5.0, 6.0, 7.0]}) data["version"] += 1 return data return get_data @pytest.mark.parametrize("org", [""]) @pytest.mark.parametrize( "privilege, task_staff, is_allow", [ ("admin", True, True), ("admin", False, True), ("business", True, True), ("business", False, False), ("worker", True, True), ("worker", False, False), ("user", True, True), ("user", False, False), ], ) def test_user_update_task_annotations( self, org, privilege, task_staff, is_allow, find_task_staff_user, find_users, request_data, tasks_by_org, filter_tasks_with_shapes, ): users = find_users(privilege=privilege) tasks = tasks_by_org[org] filtered_tasks = filter_tasks_with_shapes(tasks) username, tid = find_task_staff_user(filtered_tasks, users, task_staff) data = request_data(tid) with make_api_client(username) as api_client: (_, response) = api_client.tasks_api.partial_update_annotations( id=tid, action="update", org=org, patched_labeled_data_request=deepcopy(data), _parse_response=False, _check_status=False, ) self._test_check_response(is_allow, response, data) @pytest.mark.parametrize("org", [2]) @pytest.mark.parametrize( "role, task_staff, is_allow", [ ("maintainer", False, True), ("owner", False, True), ("supervisor", False, False), ("worker", False, False), ("maintainer", True, True), ("owner", True, True), ("supervisor", True, True), ("worker", True, True), ], ) def test_member_update_task_annotation( self, org, role, task_staff, is_allow, find_task_staff_user, find_users, tasks_by_org, request_data, ): users = find_users(role=role, org=org) tasks = tasks_by_org[org] username, tid = find_task_staff_user(tasks, users, task_staff, [14]) data = request_data(tid) with make_api_client(username) as api_client: (_, response) = api_client.tasks_api.partial_update_annotations( id=tid, org_id=org, action="update", patched_labeled_data_request=deepcopy(data), _parse_response=False, _check_status=False, ) self._test_check_response(is_allow, response, data) @pytest.mark.usefixtures("dontchangedb") class TestGetTaskDataset: def _test_export_task(self, username, tid, **kwargs): with make_api_client(username) as api_client: return export_dataset(api_client.tasks_api.retrieve_dataset_endpoint, id=tid, **kwargs) def test_can_export_task_dataset(self, admin_user, tasks_with_shapes): task = tasks_with_shapes[0] response = self._test_export_task(admin_user, task["id"], format="CVAT for images 1.1") assert response.data @pytest.mark.usefixtures("changedb") @pytest.mark.usefixtures("restore_cvat_data") class TestPostTaskData: _USERNAME = "admin1" @staticmethod def _wait_until_task_is_created(api: apis.TasksApi, task_id: int) -> models.RqStatus: for _ in range(100): (status, _) = api.retrieve_status(task_id) if status.state.value in ["Finished", "Failed"]: return status sleep(1) raise Exception("Cannot create task") def _test_create_task(self, username, spec, data, content_type, **kwargs): with make_api_client(username) as api_client: (task, response) = api_client.tasks_api.create(spec, **kwargs) assert response.status == HTTPStatus.CREATED (_, response) = api_client.tasks_api.create_data( task.id, data_request=deepcopy(data), _content_type=content_type, **kwargs ) assert response.status == HTTPStatus.ACCEPTED status = self._wait_until_task_is_created(api_client.tasks_api, task.id) assert status.state.value == "Finished" return task.id def test_can_create_task_with_defined_start_and_stop_frames(self): task_spec = { "name": f"test {self._USERNAME} to create a task with defined start and stop frames", "labels": [ { "name": "car", "color": "#ff00ff", "attributes": [ { "name": "a", "mutable": True, "input_type": "number", "default_value": "5", "values": ["4", "5", "6"], } ], } ], } task_data = { "image_quality": 75, "start_frame": 2, "stop_frame": 5, "client_files": generate_image_files(7), } task_id = self._test_create_task( self._USERNAME, task_spec, task_data, content_type="multipart/form-data" ) # check task size with make_api_client(self._USERNAME) as api_client: (task, _) = api_client.tasks_api.retrieve(task_id) assert task.size == 4 def test_can_get_annotations_from_new_task_with_skeletons(self): spec = { "name": f"test admin1 to create a task with skeleton", "labels": [ { "name": "s1", "color": "#5c5eba", "attributes": [], "type": "skeleton", "sublabels": [ {"name": "1", "color": "#d12345", "attributes": [], "type": "points"}, {"name": "2", "color": "#350dea", "attributes": [], "type": "points"}, ], "svg": '' '' '', } ], } task_data = { "image_quality": 75, "client_files": generate_image_files(3), } task_id = self._test_create_task( self._USERNAME, spec, task_data, content_type="multipart/form-data" ) response = get_method(self._USERNAME, f"tasks/{task_id}") label_ids = {} for label in response.json()["labels"]: label_ids.setdefault(label["type"], []).append(label["id"]) job_id = response.json()["segments"][0]["jobs"][0]["id"] patch_data = { "shapes": [ { "type": "skeleton", "occluded": False, "outside": False, "z_order": 0, "rotation": 0, "points": [], "frame": 0, "label_id": label_ids["skeleton"][0], "group": 0, "source": "manual", "attributes": [], "elements": [ { "type": "points", "occluded": False, "outside": False, "z_order": 0, "rotation": 0, "points": [131.63947368421032, 165.0868421052637], "frame": 0, "label_id": label_ids["points"][0], "group": 0, "source": "manual", "attributes": [], }, { "type": "points", "occluded": False, "outside": False, "z_order": 0, "rotation": 0, "points": [354.98157894736823, 304.2710526315795], "frame": 0, "label_id": label_ids["points"][1], "group": 0, "source": "manual", "attributes": [], }, ], } ], "tracks": [ { "frame": 0, "label_id": label_ids["skeleton"][0], "group": 0, "source": "manual", "shapes": [ { "type": "skeleton", "occluded": False, "outside": False, "z_order": 0, "rotation": 0, "points": [], "frame": 0, "attributes": [], } ], "attributes": [], "elements": [ { "frame": 0, "label_id": label_ids["points"][0], "group": 0, "source": "manual", "shapes": [ { "type": "points", "occluded": False, "outside": False, "z_order": 0, "rotation": 0, "points": [295.6394736842103, 472.5868421052637], "frame": 0, "attributes": [], } ], "attributes": [], }, { "frame": 0, "label_id": label_ids["points"][1], "group": 0, "source": "manual", "shapes": [ { "type": "points", "occluded": False, "outside": False, "z_order": 0, "rotation": 0, "points": [619.3236842105262, 846.9815789473689], "frame": 0, "attributes": [], } ], "attributes": [], }, ], } ], "tags": [], "version": 0, } response = patch_method( self._USERNAME, f"jobs/{job_id}/annotations", patch_data, action="create" ) response = get_method(self._USERNAME, f"jobs/{job_id}/annotations") assert response.status_code == HTTPStatus.OK @pytest.mark.parametrize( "cloud_storage_id, manifest, use_bucket_content, org", [ (1, "manifest.jsonl", False, ""), # public bucket (2, "sub/manifest.jsonl", True, "org2"), # private bucket ], ) def test_create_task_with_cloud_storage_files( self, cloud_storage_id, manifest, use_bucket_content, org ): if use_bucket_content: cloud_storage_content = get_cloud_storage_content( self._USERNAME, cloud_storage_id, manifest ) else: cloud_storage_content = ["image_case_65_1.png", "image_case_65_2.png"] cloud_storage_content.append(manifest) task_spec = { "name": f"Task with files from cloud storage {cloud_storage_id}", "labels": [ { "name": "car", } ], } data_spec = { "image_quality": 75, "use_cache": True, "storage": "cloud_storage", "cloud_storage_id": cloud_storage_id, "server_files": cloud_storage_content, } self._test_create_task( self._USERNAME, task_spec, data_spec, content_type="application/json", org=org )