# Copyright (C) 2022 Intel Corporation # Copyright (C) 2022 CVAT.ai Corporation # # SPDX-License-Identifier: MIT import io import xml.etree.ElementTree as ET import zipfile from copy import deepcopy from http import HTTPStatus from io import BytesIO from itertools import groupby, product from time import sleep import pytest from deepdiff import DeepDiff from shared.utils.config import get_method, make_api_client, patch_method from .utils import export_dataset @pytest.mark.usefixtures("restore_db_per_class") class TestGetProjects: def _find_project_by_user_org(self, user, projects, is_project_staff_flag, is_project_staff): for p in projects: if is_project_staff(user["id"], p["id"]) == is_project_staff_flag: return p["id"] def _test_response_200(self, username, project_id, **kwargs): with make_api_client(username) as api_client: (project, response) = api_client.projects_api.retrieve(project_id, **kwargs) assert response.status == HTTPStatus.OK assert project_id == project.id def _test_response_403(self, username, project_id): with make_api_client(username) as api_client: (_, response) = api_client.projects_api.retrieve( project_id, _parse_response=False, _check_status=False ) assert response.status == HTTPStatus.FORBIDDEN # Admin can see any project even he has no ownerships for this project. def test_project_admin_accessibility(self, projects, find_users, is_project_staff, org_staff): users = find_users(privilege="admin") user, project = next( (user, project) for user, project in product(users, projects) if not is_project_staff(user["id"], project["organization"]) and user["id"] not in org_staff(project["organization"]) ) self._test_response_200(user["username"], project["id"]) # Project owner or project assignee can see project. def test_project_owner_accessibility(self, projects): for p in projects: if p["owner"] is not None: project_with_owner = p if p["assignee"] is not None: project_with_assignee = p assert project_with_owner is not None assert project_with_assignee is not None self._test_response_200(project_with_owner["owner"]["username"], project_with_owner["id"]) self._test_response_200( project_with_assignee["assignee"]["username"], project_with_assignee["id"] ) def test_user_cannot_see_project(self, projects, find_users, is_project_staff, org_staff): users = find_users(exclude_privilege="admin") user, project = next( (user, project) for user, project in product(users, projects) if not is_project_staff(user["id"], project["organization"]) and user["id"] not in org_staff(project["organization"]) ) self._test_response_403(user["username"], project["id"]) @pytest.mark.parametrize("role", ("supervisor", "worker")) def test_if_supervisor_or_worker_cannot_see_project( self, projects, is_project_staff, find_users, role ): user, pid = next( ( (user, project["id"]) for user in find_users(role=role, exclude_privilege="admin") for project in projects if project["organization"] == user["org"] and not is_project_staff(user["id"], project["id"]) ) ) self._test_response_403(user["username"], pid) @pytest.mark.parametrize("role", ("maintainer", "owner")) def test_if_maintainer_or_owner_can_see_project( self, find_users, projects, is_project_staff, role ): user, pid = next( ( (user, project["id"]) for user in find_users(role=role, exclude_privilege="admin") for project in projects if project["organization"] == user["org"] and not is_project_staff(user["id"], project["id"]) ) ) self._test_response_200(user["username"], pid, org_id=user["org"]) @pytest.mark.parametrize("role", ("supervisor", "worker")) def test_if_org_member_supervisor_or_worker_can_see_project( self, projects, find_users, is_project_staff, role ): user, pid = next( ( (user, project["id"]) for user in find_users(role=role, exclude_privilege="admin") for project in projects if project["organization"] == user["org"] and is_project_staff(user["id"], project["id"]) ) ) self._test_response_200(user["username"], pid, org_id=user["org"]) class TestGetProjectBackup: def _test_can_get_project_backup(self, username, pid, **kwargs): for _ in range(30): response = get_method(username, f"projects/{pid}/backup", **kwargs) response.raise_for_status() if response.status_code == HTTPStatus.CREATED: break sleep(1) response = get_method(username, f"projects/{pid}/backup", action="download", **kwargs) assert response.status_code == HTTPStatus.OK def _test_cannot_get_project_backup(self, username, pid, **kwargs): response = get_method(username, f"projects/{pid}/backup", **kwargs) assert response.status_code == HTTPStatus.FORBIDDEN def test_admin_can_get_project_backup(self, projects): project = list(projects)[0] self._test_can_get_project_backup("admin1", project["id"]) # User that not in [project:owner, project:assignee] cannot get project backup. def test_user_cannot_get_project_backup(self, find_users, projects, is_project_staff): users = find_users(exclude_privilege="admin") user, project = next( (user, project) for user, project in product(users, projects) if not is_project_staff(user["id"], project["id"]) ) self._test_cannot_get_project_backup(user["username"], project["id"]) # Org worker that not in [project:owner, project:assignee] cannot get project backup. def test_org_worker_cannot_get_project_backup( self, find_users, projects, is_project_staff, is_org_member ): users = find_users(role="worker", exclude_privilege="admin") user, project = next( (user, project) for user, project in product(users, projects) if not is_project_staff(user["id"], project["id"]) and project["organization"] and is_org_member(user["id"], project["organization"]) ) self._test_cannot_get_project_backup( user["username"], project["id"], org_id=project["organization"] ) # Org worker that in [project:owner, project:assignee] can get project backup. def test_org_worker_can_get_project_backup( self, find_users, projects, is_project_staff, is_org_member ): users = find_users(role="worker", exclude_privilege="admin") user, project = next( (user, project) for user, project in product(users, projects) if is_project_staff(user["id"], project["id"]) and project["organization"] and is_org_member(user["id"], project["organization"]) ) self._test_can_get_project_backup( user["username"], project["id"], org_id=project["organization"] ) # Org supervisor that in [project:owner, project:assignee] can get project backup. def test_org_supervisor_can_get_project_backup( self, find_users, projects, is_project_staff, is_org_member ): users = find_users(role="supervisor", exclude_privilege="admin") user, project = next( (user, project) for user, project in product(users, projects) if is_project_staff(user["id"], project["id"]) and project["organization"] and is_org_member(user["id"], project["organization"]) ) self._test_can_get_project_backup( user["username"], project["id"], org_id=project["organization"] ) # Org supervisor that not in [project:owner, project:assignee] cannot get project backup. def test_org_supervisor_cannot_get_project_backup( self, find_users, projects, is_project_staff, is_org_member ): users = find_users(role="supervisor", exclude_privilege="admin") user, project = next( (user, project) for user, project in product(users, projects) if not is_project_staff(user["id"], project["id"]) and project["organization"] and is_org_member(user["id"], project["organization"]) ) self._test_cannot_get_project_backup( user["username"], project["id"], org_id=project["organization"] ) # Org maintainer that not in [project:owner, project:assignee] can get project backup. def test_org_maintainer_can_get_project_backup( self, find_users, projects, is_project_staff, is_org_member ): users = find_users(role="maintainer", exclude_privilege="admin") user, project = next( (user, project) for user, project in product(users, projects) if not is_project_staff(user["id"], project["id"]) and project["organization"] and is_org_member(user["id"], project["organization"]) ) self._test_can_get_project_backup( user["username"], project["id"], org_id=project["organization"] ) # Org owner that not in [project:owner, project:assignee] can get project backup. def test_org_owner_can_get_project_backup( self, find_users, projects, is_project_staff, is_org_member ): users = find_users(role="owner", exclude_privilege="admin") user, project = next( (user, project) for user, project in product(users, projects) if not is_project_staff(user["id"], project["id"]) and project["organization"] and is_org_member(user["id"], project["organization"]) ) self._test_can_get_project_backup( user["username"], project["id"], org_id=project["organization"] ) @pytest.mark.usefixtures("restore_db_per_function") class TestPostProjects: def _test_create_project_201(self, user, spec, **kwargs): with make_api_client(user) as api_client: (_, response) = api_client.projects_api.create(spec, **kwargs) assert response.status == HTTPStatus.CREATED def _test_create_project_403(self, user, spec, **kwargs): with make_api_client(user) as api_client: (_, response) = api_client.projects_api.create( spec, **kwargs, _parse_response=False, _check_status=False ) assert response.status == HTTPStatus.FORBIDDEN def test_if_worker_cannot_create_project(self, find_users): workers = find_users(privilege="worker") assert len(workers) username = workers[0]["username"] spec = {"name": f"test {username} tries to create a project"} self._test_create_project_403(username, spec) @pytest.mark.parametrize("privilege", ("admin", "business", "user")) def test_if_user_can_create_project(self, find_users, privilege): privileged_users = find_users(privilege=privilege) assert len(privileged_users) username = privileged_users[0]["username"] spec = {"name": f"test {username} tries to create a project"} self._test_create_project_201(username, spec) def test_if_user_cannot_have_more_than_3_projects(self, projects, find_users): users = find_users(privilege="user") user_id, user_projects = next( (user_id, len(list(projects))) for user_id, projects in groupby(projects, lambda a: a["owner"]["id"]) if len(list(projects)) < 3 ) user = users[user_id] for i in range(1, 4 - user_projects): spec = { "name": f'test: {user["username"]} tries to create a project number {user_projects + i}' } self._test_create_project_201(user["username"], spec) spec = {"name": f'test {user["username"]} tries to create more than 3 projects'} self._test_create_project_403(user["username"], spec) @pytest.mark.parametrize("privilege", ("admin", "business")) def test_if_user_can_have_more_than_3_projects(self, find_users, privilege): privileged_users = find_users(privilege=privilege) assert len(privileged_users) user = privileged_users[0] for i in range(1, 5): spec = { "name": f'test: {user["username"]} with privilege {privilege} tries to create a project number {i}' } self._test_create_project_201(user["username"], spec) def test_if_org_worker_cannot_crate_project(self, find_users): workers = find_users(role="worker") worker = next(u for u in workers if u["org"]) spec = { "name": f'test: worker {worker["username"]} creating a project for his organization', } self._test_create_project_403(worker["username"], spec, org_id=worker["org"]) @pytest.mark.parametrize("role", ("supervisor", "maintainer", "owner")) def test_if_org_role_can_create_project(self, find_users, role): privileged_users = find_users(role=role) assert len(privileged_users) user = next(u for u in privileged_users if u["org"]) spec = { "name": f'test: worker {user["username"]} creating a project for his organization', } self._test_create_project_201(user["username"], spec, org_id=user["org"]) def _check_cvat_for_video_project_annotations_meta(content, values_to_be_checked): document = ET.fromstring(content) instance = list(document.find("meta"))[0] assert instance.tag == "project" assert instance.find("id").text == values_to_be_checked["pid"] assert len(list(document.iter("task"))) == len(values_to_be_checked["tasks"]) tasks = document.iter("task") for task_checking in values_to_be_checked["tasks"]: task_meta = next(tasks) assert task_meta.find("id").text == str(task_checking["id"]) assert task_meta.find("name").text == task_checking["name"] assert task_meta.find("size").text == str(task_checking["size"]) assert task_meta.find("mode").text == task_checking["mode"] assert task_meta.find("source").text @pytest.mark.usefixtures("restore_db_per_function") class TestImportExportDatasetProject: def _test_export_project(self, username, pid, format_name): with make_api_client(username) as api_client: return export_dataset( api_client.projects_api.retrieve_dataset_endpoint, id=pid, format=format_name ) def _export_annotations(self, username, pid, format_name): with make_api_client(username) as api_client: return export_dataset( api_client.projects_api.retrieve_annotations_endpoint, id=pid, format=format_name ) def _test_import_project(self, username, project_id, format_name, data): with make_api_client(username) as api_client: (_, response) = api_client.projects_api.create_dataset( id=project_id, format=format_name, dataset_write_request=deepcopy(data), _content_type="multipart/form-data", ) assert response.status == HTTPStatus.ACCEPTED while True: # TODO: It's better be refactored to a separate endpoint to get request status (_, response) = api_client.projects_api.retrieve_dataset( project_id, action="import_status" ) if response.status == HTTPStatus.CREATED: break def test_can_import_dataset_in_org(self, admin_user): project_id = 4 response = self._test_export_project(admin_user, project_id, "CVAT for images 1.1") tmp_file = io.BytesIO(response.data) tmp_file.name = "dataset.zip" import_data = { "dataset_file": tmp_file, } self._test_import_project(admin_user, project_id, "CVAT 1.1", import_data) def test_can_export_and_import_dataset_with_skeletons_coco_keypoints(self, admin_user): project_id = 5 response = self._test_export_project(admin_user, project_id, "COCO Keypoints 1.0") tmp_file = io.BytesIO(response.data) tmp_file.name = "dataset.zip" import_data = { "dataset_file": tmp_file, } self._test_import_project(admin_user, project_id, "COCO Keypoints 1.0", import_data) def test_can_export_and_import_dataset_with_skeletons_cvat_for_images(self, admin_user): project_id = 5 response = self._test_export_project(admin_user, project_id, "CVAT for images 1.1") tmp_file = io.BytesIO(response.data) tmp_file.name = "dataset.zip" import_data = { "dataset_file": tmp_file, } self._test_import_project(admin_user, project_id, "CVAT 1.1", import_data) def test_can_export_and_import_dataset_with_skeletons_cvat_for_video(self, admin_user): project_id = 5 response = self._test_export_project(admin_user, project_id, "CVAT for video 1.1") tmp_file = io.BytesIO(response.data) tmp_file.name = "dataset.zip" import_data = { "dataset_file": tmp_file, } self._test_import_project(admin_user, project_id, "CVAT 1.1", import_data) def _test_can_get_project_backup(self, username, pid, **kwargs): for _ in range(30): response = get_method(username, f"projects/{pid}/backup", **kwargs) response.raise_for_status() if response.status_code == HTTPStatus.CREATED: break sleep(1) response = get_method(username, f"projects/{pid}/backup", action="download", **kwargs) assert response.status_code == HTTPStatus.OK return response def test_admin_can_get_project_backup_and_create_project_by_backup(self, admin_user): project_id = 5 response = self._test_can_get_project_backup(admin_user, project_id) tmp_file = io.BytesIO(response.content) tmp_file.name = "dataset.zip" import_data = { "project_file": tmp_file, } with make_api_client(admin_user) as api_client: (_, response) = api_client.projects_api.create_backup( backup_write_request=deepcopy(import_data), _content_type="multipart/form-data" ) assert response.status == HTTPStatus.ACCEPTED @pytest.mark.parametrize("format_name", ("Datumaro 1.0", "ImageNet 1.0", "PASCAL VOC 1.1")) def test_can_import_export_dataset_with_some_format(self, format_name): # https://github.com/opencv/cvat/issues/4410 # https://github.com/opencv/cvat/issues/4850 # https://github.com/opencv/cvat/issues/4621 username = "admin1" project_id = 4 response = self._test_export_project(username, project_id, format_name) tmp_file = io.BytesIO(response.data) tmp_file.name = "dataset.zip" import_data = { "dataset_file": tmp_file, } self._test_import_project(username, project_id, format_name, import_data) @pytest.mark.parametrize("username, pid", [("admin1", 8)]) @pytest.mark.parametrize( "anno_format, anno_file_name, check_func", [ ( "CVAT for video 1.1", "annotations.xml", _check_cvat_for_video_project_annotations_meta, ), ], ) def test_exported_project_dataset_structure( self, username, pid, anno_format, anno_file_name, check_func, tasks, projects, annotations, ): project = projects[pid] values_to_be_checked = { "pid": str(pid), "name": project["name"], "tasks": [ { "id": tid, "name": (task := tasks[tid])["name"], "size": str(task["size"]), "mode": task["mode"], } for tid in project["tasks"] ], } response = self._export_annotations(username, pid, anno_format) assert response.data with zipfile.ZipFile(BytesIO(response.data)) as zip_file: content = zip_file.read(anno_file_name) check_func(content, values_to_be_checked) @pytest.mark.usefixtures("restore_db_per_function") class TestPatchProjectLabel: def test_admin_can_delete_label(self, projects): project = deepcopy(list(projects)[1]) labels = project["labels"][0] labels.update({"deleted": True}) response = patch_method("admin1", f'/projects/{project["id"]}', {"labels": [labels]}) assert response.status_code == HTTPStatus.OK assert len(response.json()["labels"]) == len(project["labels"]) - 1 def test_admin_can_delete_skeleton_label(self, projects): project = deepcopy(projects[5]) labels = project["labels"][0] labels.update({"deleted": True}) response = patch_method("admin1", f'/projects/{project["id"]}', {"labels": [labels]}) assert response.status_code == HTTPStatus.OK assert len(response.json()["labels"]) == len(project["labels"]) - 4 def test_admin_can_rename_label(self, projects): project = deepcopy(list(projects)[0]) labels = project["labels"][0] labels.update({"name": "new name"}) response = patch_method("admin1", f'/projects/{project["id"]}', {"labels": [labels]}) assert response.status_code == HTTPStatus.OK assert DeepDiff(response.json()["labels"], project["labels"], ignore_order=True) == {} def test_admin_can_add_label(self, projects): project = list(projects)[0] labels = {"name": "new name"} response = patch_method("admin1", f'/projects/{project["id"]}', {"labels": [labels]}) assert response.status_code == HTTPStatus.OK assert len(response.json()["labels"]) == len(project["labels"]) + 1 # Org maintainer can add label even he is not in [project:owner, project:assignee] def test_org_maintainer_can_add_label( self, find_users, projects, is_project_staff, is_org_member ): users = find_users(role="maintainer", exclude_privilege="admin") user, project = next( (user, project) for user, project in product(users, projects) if not is_project_staff(user["id"], project["id"]) and project["organization"] and is_org_member(user["id"], project["organization"]) ) labels = {"name": "new name"} response = patch_method( user["username"], f'/projects/{project["id"]}', {"labels": [labels]}, org_id=project["organization"], ) assert response.status_code == HTTPStatus.OK assert len(response.json()["labels"]) == len(project["labels"]) + 1 # Org supervisor cannot add label def test_org_supervisor_can_add_label( self, find_users, projects, is_project_staff, is_org_member ): users = find_users(role="supervisor", exclude_privilege="admin") user, project = next( (user, project) for user, project in product(users, projects) if not is_project_staff(user["id"], project["id"]) and project["organization"] and is_org_member(user["id"], project["organization"]) ) labels = {"name": "new name"} response = patch_method( user["username"], f'/projects/{project["id"]}', {"labels": [labels]}, org_id=project["organization"], ) assert response.status_code == HTTPStatus.FORBIDDEN # Org worker cannot add label def test_org_worker_cannot_add_label( self, find_users, projects, is_project_staff, is_org_member ): users = find_users(role="worker", exclude_privilege="admin") user, project = next( (user, project) for user, project in product(users, projects) if not is_project_staff(user["id"], project["id"]) and project["organization"] and is_org_member(user["id"], project["organization"]) ) labels = {"name": "new name"} response = patch_method( user["username"], f'/projects/{project["id"]}', {"labels": [labels]}, org_id=project["organization"], ) assert response.status_code == HTTPStatus.FORBIDDEN # Org worker that in [project:owner, project:assignee] can add label def test_org_worker_can_add_label(self, find_users, projects, is_project_staff, is_org_member): users = find_users(role="worker", exclude_privilege="admin") user, project = next( (user, project) for user, project in product(users, projects) if is_project_staff(user["id"], project["id"]) and project["organization"] and is_org_member(user["id"], project["organization"]) ) labels = {"name": "new name"} response = patch_method( user["username"], f'/projects/{project["id"]}', {"labels": [labels]}, org_id=project["organization"], ) assert response.status_code == HTTPStatus.OK assert len(response.json()["labels"]) == len(project["labels"]) + 1 # Org owner can add label even he is not in [project:owner, project:assignee] def test_org_owner_can_add_label(self, find_users, projects, is_project_staff, is_org_member): users = find_users(role="owner", exclude_privilege="admin") user, project = next( (user, project) for user, project in product(users, projects) if not is_project_staff(user["id"], project["id"]) and project["organization"] and is_org_member(user["id"], project["organization"]) ) labels = {"name": "new name"} response = patch_method( user["username"], f'/projects/{project["id"]}', {"labels": [labels]}, org_id=project["organization"], ) assert response.status_code == HTTPStatus.OK assert len(response.json()["labels"]) == len(project["labels"]) + 1