You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

732 lines
28 KiB
Python

# Copyright (C) 2022 Intel Corporation
# Copyright (C) 2022 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT
import io
import json
import xml.etree.ElementTree as ET
import zipfile
from copy import deepcopy
from http import HTTPStatus
from io import BytesIO
from itertools import groupby, product
from time import sleep
import pytest
from deepdiff import DeepDiff
from shared.utils.config import get_method, make_api_client, patch_method
from .utils import export_dataset
@pytest.mark.usefixtures("restore_db_per_class")
class TestGetProjects:
def _find_project_by_user_org(self, user, projects, is_project_staff_flag, is_project_staff):
for p in projects:
if is_project_staff(user["id"], p["id"]) == is_project_staff_flag:
return p["id"]
def _test_response_200(self, username, project_id, **kwargs):
with make_api_client(username) as api_client:
(project, response) = api_client.projects_api.retrieve(project_id, **kwargs)
assert response.status == HTTPStatus.OK
assert project_id == project.id
def _test_response_403(self, username, project_id):
with make_api_client(username) as api_client:
(_, response) = api_client.projects_api.retrieve(
project_id, _parse_response=False, _check_status=False
)
assert response.status == HTTPStatus.FORBIDDEN
# Admin can see any project even he has no ownerships for this project.
def test_project_admin_accessibility(self, projects, find_users, is_project_staff, org_staff):
users = find_users(privilege="admin")
user, project = next(
(user, project)
for user, project in product(users, projects)
if not is_project_staff(user["id"], project["organization"])
and user["id"] not in org_staff(project["organization"])
)
self._test_response_200(user["username"], project["id"])
# Project owner or project assignee can see project.
def test_project_owner_accessibility(self, projects):
for p in projects:
if p["owner"] is not None:
project_with_owner = p
if p["assignee"] is not None:
project_with_assignee = p
assert project_with_owner is not None
assert project_with_assignee is not None
self._test_response_200(project_with_owner["owner"]["username"], project_with_owner["id"])
self._test_response_200(
project_with_assignee["assignee"]["username"], project_with_assignee["id"]
)
def test_user_cannot_see_project(self, projects, find_users, is_project_staff, org_staff):
users = find_users(exclude_privilege="admin")
user, project = next(
(user, project)
for user, project in product(users, projects)
if not is_project_staff(user["id"], project["organization"])
and user["id"] not in org_staff(project["organization"])
)
self._test_response_403(user["username"], project["id"])
@pytest.mark.parametrize("role", ("supervisor", "worker"))
def test_if_supervisor_or_worker_cannot_see_project(
self, projects, is_project_staff, find_users, role
):
user, pid = next(
(
(user, project["id"])
for user in find_users(role=role, exclude_privilege="admin")
for project in projects
if project["organization"] == user["org"]
and not is_project_staff(user["id"], project["id"])
)
)
self._test_response_403(user["username"], pid)
@pytest.mark.parametrize("role", ("maintainer", "owner"))
def test_if_maintainer_or_owner_can_see_project(
self, find_users, projects, is_project_staff, role
):
user, pid = next(
(
(user, project["id"])
for user in find_users(role=role, exclude_privilege="admin")
for project in projects
if project["organization"] == user["org"]
and not is_project_staff(user["id"], project["id"])
)
)
self._test_response_200(user["username"], pid, org_id=user["org"])
@pytest.mark.parametrize("role", ("supervisor", "worker"))
def test_if_org_member_supervisor_or_worker_can_see_project(
self, projects, find_users, is_project_staff, role
):
user, pid = next(
(
(user, project["id"])
for user in find_users(role=role, exclude_privilege="admin")
for project in projects
if project["organization"] == user["org"]
and is_project_staff(user["id"], project["id"])
)
)
self._test_response_200(user["username"], pid, org_id=user["org"])
class TestGetProjectBackup:
def _test_can_get_project_backup(self, username, pid, **kwargs):
for _ in range(30):
response = get_method(username, f"projects/{pid}/backup", **kwargs)
response.raise_for_status()
if response.status_code == HTTPStatus.CREATED:
break
sleep(1)
response = get_method(username, f"projects/{pid}/backup", action="download", **kwargs)
assert response.status_code == HTTPStatus.OK
def _test_cannot_get_project_backup(self, username, pid, **kwargs):
response = get_method(username, f"projects/{pid}/backup", **kwargs)
assert response.status_code == HTTPStatus.FORBIDDEN
def test_admin_can_get_project_backup(self, projects):
project = list(projects)[0]
self._test_can_get_project_backup("admin1", project["id"])
# User that not in [project:owner, project:assignee] cannot get project backup.
def test_user_cannot_get_project_backup(self, find_users, projects, is_project_staff):
users = find_users(exclude_privilege="admin")
user, project = next(
(user, project)
for user, project in product(users, projects)
if not is_project_staff(user["id"], project["id"])
)
self._test_cannot_get_project_backup(user["username"], project["id"])
# Org worker that not in [project:owner, project:assignee] cannot get project backup.
def test_org_worker_cannot_get_project_backup(
self, find_users, projects, is_project_staff, is_org_member
):
users = find_users(role="worker", exclude_privilege="admin")
user, project = next(
(user, project)
for user, project in product(users, projects)
if not is_project_staff(user["id"], project["id"])
and project["organization"]
and is_org_member(user["id"], project["organization"])
)
self._test_cannot_get_project_backup(
user["username"], project["id"], org_id=project["organization"]
)
# Org worker that in [project:owner, project:assignee] can get project backup.
def test_org_worker_can_get_project_backup(
self, find_users, projects, is_project_staff, is_org_member
):
users = find_users(role="worker", exclude_privilege="admin")
user, project = next(
(user, project)
for user, project in product(users, projects)
if is_project_staff(user["id"], project["id"])
and project["organization"]
and is_org_member(user["id"], project["organization"])
)
self._test_can_get_project_backup(
user["username"], project["id"], org_id=project["organization"]
)
# Org supervisor that in [project:owner, project:assignee] can get project backup.
def test_org_supervisor_can_get_project_backup(
self, find_users, projects, is_project_staff, is_org_member
):
users = find_users(role="supervisor", exclude_privilege="admin")
user, project = next(
(user, project)
for user, project in product(users, projects)
if is_project_staff(user["id"], project["id"])
and project["organization"]
and is_org_member(user["id"], project["organization"])
)
self._test_can_get_project_backup(
user["username"], project["id"], org_id=project["organization"]
)
# Org supervisor that not in [project:owner, project:assignee] cannot get project backup.
def test_org_supervisor_cannot_get_project_backup(
self, find_users, projects, is_project_staff, is_org_member
):
users = find_users(role="supervisor", exclude_privilege="admin")
user, project = next(
(user, project)
for user, project in product(users, projects)
if not is_project_staff(user["id"], project["id"])
and project["organization"]
and is_org_member(user["id"], project["organization"])
)
self._test_cannot_get_project_backup(
user["username"], project["id"], org_id=project["organization"]
)
# Org maintainer that not in [project:owner, project:assignee] can get project backup.
def test_org_maintainer_can_get_project_backup(
self, find_users, projects, is_project_staff, is_org_member
):
users = find_users(role="maintainer", exclude_privilege="admin")
user, project = next(
(user, project)
for user, project in product(users, projects)
if not is_project_staff(user["id"], project["id"])
and project["organization"]
and is_org_member(user["id"], project["organization"])
)
self._test_can_get_project_backup(
user["username"], project["id"], org_id=project["organization"]
)
# Org owner that not in [project:owner, project:assignee] can get project backup.
def test_org_owner_can_get_project_backup(
self, find_users, projects, is_project_staff, is_org_member
):
users = find_users(role="owner", exclude_privilege="admin")
user, project = next(
(user, project)
for user, project in product(users, projects)
if not is_project_staff(user["id"], project["id"])
and project["organization"]
and is_org_member(user["id"], project["organization"])
)
self._test_can_get_project_backup(
user["username"], project["id"], org_id=project["organization"]
)
@pytest.mark.usefixtures("restore_db_per_function")
class TestPostProjects:
def _test_create_project_201(self, user, spec, **kwargs):
with make_api_client(user) as api_client:
(_, response) = api_client.projects_api.create(spec, **kwargs)
assert response.status == HTTPStatus.CREATED
def _test_create_project_403(self, user, spec, **kwargs):
with make_api_client(user) as api_client:
(_, response) = api_client.projects_api.create(
spec, **kwargs, _parse_response=False, _check_status=False
)
assert response.status == HTTPStatus.FORBIDDEN
def test_if_worker_cannot_create_project(self, find_users):
workers = find_users(privilege="worker")
assert len(workers)
username = workers[0]["username"]
spec = {"name": f"test {username} tries to create a project"}
self._test_create_project_403(username, spec)
@pytest.mark.parametrize("privilege", ("admin", "business", "user"))
def test_if_user_can_create_project(self, find_users, privilege):
privileged_users = find_users(privilege=privilege)
assert len(privileged_users)
username = privileged_users[0]["username"]
spec = {"name": f"test {username} tries to create a project"}
self._test_create_project_201(username, spec)
def test_if_user_cannot_have_more_than_3_projects(self, projects, find_users):
users = find_users(privilege="user")
user_id, user_projects = next(
(user_id, len(list(projects)))
for user_id, projects in groupby(projects, lambda a: a["owner"]["id"])
if len(list(projects)) < 3
)
user = users[user_id]
for i in range(1, 4 - user_projects):
spec = {
"name": f'test: {user["username"]} tries to create a project number {user_projects + i}'
}
self._test_create_project_201(user["username"], spec)
spec = {"name": f'test {user["username"]} tries to create more than 3 projects'}
self._test_create_project_403(user["username"], spec)
@pytest.mark.parametrize("privilege", ("admin", "business"))
def test_if_user_can_have_more_than_3_projects(self, find_users, privilege):
privileged_users = find_users(privilege=privilege)
assert len(privileged_users)
user = privileged_users[0]
for i in range(1, 5):
spec = {
"name": f'test: {user["username"]} with privilege {privilege} tries to create a project number {i}'
}
self._test_create_project_201(user["username"], spec)
def test_if_org_worker_cannot_crate_project(self, find_users):
workers = find_users(role="worker")
worker = next(u for u in workers if u["org"])
spec = {
"name": f'test: worker {worker["username"]} creating a project for his organization',
}
self._test_create_project_403(worker["username"], spec, org_id=worker["org"])
@pytest.mark.parametrize("role", ("supervisor", "maintainer", "owner"))
def test_if_org_role_can_create_project(self, find_users, role):
privileged_users = find_users(role=role)
assert len(privileged_users)
user = next(u for u in privileged_users if u["org"])
spec = {
"name": f'test: worker {user["username"]} creating a project for his organization',
}
self._test_create_project_201(user["username"], spec, org_id=user["org"])
def _check_cvat_for_video_project_annotations_meta(content, values_to_be_checked):
document = ET.fromstring(content)
instance = list(document.find("meta"))[0]
assert instance.tag == "project"
assert instance.find("id").text == values_to_be_checked["pid"]
assert len(list(document.iter("task"))) == len(values_to_be_checked["tasks"])
tasks = document.iter("task")
for task_checking in values_to_be_checked["tasks"]:
task_meta = next(tasks)
assert task_meta.find("id").text == str(task_checking["id"])
assert task_meta.find("name").text == task_checking["name"]
assert task_meta.find("size").text == str(task_checking["size"])
assert task_meta.find("mode").text == task_checking["mode"]
assert task_meta.find("source").text
@pytest.mark.usefixtures("restore_db_per_function")
class TestImportExportDatasetProject:
def _test_export_project(self, username, pid, format_name):
with make_api_client(username) as api_client:
return export_dataset(
api_client.projects_api.retrieve_dataset_endpoint, id=pid, format=format_name
)
def _export_annotations(self, username, pid, format_name):
with make_api_client(username) as api_client:
return export_dataset(
api_client.projects_api.retrieve_annotations_endpoint, id=pid, format=format_name
)
def _test_import_project(self, username, project_id, format_name, data):
with make_api_client(username) as api_client:
(_, response) = api_client.projects_api.create_dataset(
id=project_id,
format=format_name,
dataset_write_request=deepcopy(data),
_content_type="multipart/form-data",
)
assert response.status == HTTPStatus.ACCEPTED
while True:
# TODO: It's better be refactored to a separate endpoint to get request status
(_, response) = api_client.projects_api.retrieve_dataset(
project_id, action="import_status"
)
if response.status == HTTPStatus.CREATED:
break
def _test_get_annotations_from_task(self, username, task_id):
with make_api_client(username) as api_client:
(_, response) = api_client.tasks_api.retrieve_annotations(task_id)
assert response.status == HTTPStatus.OK
response_data = json.loads(response.data)
return response_data
def test_can_import_dataset_in_org(self, admin_user):
project_id = 4
response = self._test_export_project(admin_user, project_id, "CVAT for images 1.1")
tmp_file = io.BytesIO(response.data)
tmp_file.name = "dataset.zip"
import_data = {
"dataset_file": tmp_file,
}
self._test_import_project(admin_user, project_id, "CVAT 1.1", import_data)
def test_can_export_and_import_dataset_with_skeletons_coco_keypoints(self, admin_user):
project_id = 5
response = self._test_export_project(admin_user, project_id, "COCO Keypoints 1.0")
tmp_file = io.BytesIO(response.data)
tmp_file.name = "dataset.zip"
import_data = {
"dataset_file": tmp_file,
}
self._test_import_project(admin_user, project_id, "COCO Keypoints 1.0", import_data)
def test_can_export_and_import_dataset_with_skeletons_cvat_for_images(self, admin_user):
project_id = 5
response = self._test_export_project(admin_user, project_id, "CVAT for images 1.1")
tmp_file = io.BytesIO(response.data)
tmp_file.name = "dataset.zip"
import_data = {
"dataset_file": tmp_file,
}
self._test_import_project(admin_user, project_id, "CVAT 1.1", import_data)
def test_can_export_and_import_dataset_with_skeletons_cvat_for_video(self, admin_user):
project_id = 5
response = self._test_export_project(admin_user, project_id, "CVAT for video 1.1")
tmp_file = io.BytesIO(response.data)
tmp_file.name = "dataset.zip"
import_data = {
"dataset_file": tmp_file,
}
self._test_import_project(admin_user, project_id, "CVAT 1.1", import_data)
def _test_can_get_project_backup(self, username, pid, **kwargs):
for _ in range(30):
response = get_method(username, f"projects/{pid}/backup", **kwargs)
response.raise_for_status()
if response.status_code == HTTPStatus.CREATED:
break
sleep(1)
response = get_method(username, f"projects/{pid}/backup", action="download", **kwargs)
assert response.status_code == HTTPStatus.OK
return response
def test_admin_can_get_project_backup_and_create_project_by_backup(self, admin_user):
project_id = 5
response = self._test_can_get_project_backup(admin_user, project_id)
tmp_file = io.BytesIO(response.content)
tmp_file.name = "dataset.zip"
import_data = {
"project_file": tmp_file,
}
with make_api_client(admin_user) as api_client:
(_, response) = api_client.projects_api.create_backup(
backup_write_request=deepcopy(import_data), _content_type="multipart/form-data"
)
assert response.status == HTTPStatus.ACCEPTED
@pytest.mark.parametrize("format_name", ("Datumaro 1.0", "ImageNet 1.0", "PASCAL VOC 1.1"))
def test_can_import_export_dataset_with_some_format(self, format_name):
# https://github.com/opencv/cvat/issues/4410
# https://github.com/opencv/cvat/issues/4850
# https://github.com/opencv/cvat/issues/4621
username = "admin1"
project_id = 4
response = self._test_export_project(username, project_id, format_name)
tmp_file = io.BytesIO(response.data)
tmp_file.name = "dataset.zip"
import_data = {
"dataset_file": tmp_file,
}
self._test_import_project(username, project_id, format_name, import_data)
@pytest.mark.parametrize("username, pid", [("admin1", 8)])
@pytest.mark.parametrize(
"anno_format, anno_file_name, check_func",
[
(
"CVAT for video 1.1",
"annotations.xml",
_check_cvat_for_video_project_annotations_meta,
),
],
)
def test_exported_project_dataset_structure(
self,
username,
pid,
anno_format,
anno_file_name,
check_func,
tasks,
projects,
annotations,
):
project = projects[pid]
values_to_be_checked = {
"pid": str(pid),
"name": project["name"],
"tasks": [
{
"id": tid,
"name": (task := tasks[tid])["name"],
"size": str(task["size"]),
"mode": task["mode"],
}
for tid in project["tasks"]
],
}
response = self._export_annotations(username, pid, anno_format)
assert response.data
with zipfile.ZipFile(BytesIO(response.data)) as zip_file:
content = zip_file.read(anno_file_name)
check_func(content, values_to_be_checked)
def test_can_import_export_annotations_with_rotation(self):
# https://github.com/opencv/cvat/issues/4378
username = "admin1"
project_id = 4
response = self._test_export_project(username, project_id, "CVAT for images 1.1")
tmp_file = io.BytesIO(response.data)
tmp_file.name = "dataset.zip"
import_data = {
"dataset_file": tmp_file,
}
self._test_import_project(username, project_id, "CVAT 1.1", import_data)
response = get_method(username, f"/projects/{project_id}/tasks")
assert response.status_code == HTTPStatus.OK
tasks = response.json()["results"]
response_data = self._test_get_annotations_from_task(username, tasks[0]["id"])
task1_rotation = response_data["shapes"][0]["rotation"]
response_data = self._test_get_annotations_from_task(username, tasks[1]["id"])
task2_rotation = response_data["shapes"][0]["rotation"]
assert task1_rotation == task2_rotation
@pytest.mark.usefixtures("restore_db_per_function")
class TestPatchProjectLabel:
def test_admin_can_delete_label(self, projects):
project = deepcopy(list(projects)[1])
labels = project["labels"][0]
labels.update({"deleted": True})
response = patch_method("admin1", f'/projects/{project["id"]}', {"labels": [labels]})
assert response.status_code == HTTPStatus.OK
assert len(response.json()["labels"]) == len(project["labels"]) - 1
def test_admin_can_delete_skeleton_label(self, projects):
project = deepcopy(projects[5])
labels = project["labels"][0]
labels.update({"deleted": True})
response = patch_method("admin1", f'/projects/{project["id"]}', {"labels": [labels]})
assert response.status_code == HTTPStatus.OK
assert len(response.json()["labels"]) == len(project["labels"]) - 4
def test_admin_can_rename_label(self, projects):
project = deepcopy(list(projects)[0])
labels = project["labels"][0]
labels.update({"name": "new name"})
response = patch_method("admin1", f'/projects/{project["id"]}', {"labels": [labels]})
assert response.status_code == HTTPStatus.OK
assert DeepDiff(response.json()["labels"], project["labels"], ignore_order=True) == {}
def test_admin_can_add_label(self, projects):
project = list(projects)[0]
labels = {"name": "new name"}
response = patch_method("admin1", f'/projects/{project["id"]}', {"labels": [labels]})
assert response.status_code == HTTPStatus.OK
assert len(response.json()["labels"]) == len(project["labels"]) + 1
# Org maintainer can add label even he is not in [project:owner, project:assignee]
def test_org_maintainer_can_add_label(
self, find_users, projects, is_project_staff, is_org_member
):
users = find_users(role="maintainer", exclude_privilege="admin")
user, project = next(
(user, project)
for user, project in product(users, projects)
if not is_project_staff(user["id"], project["id"])
and project["organization"]
and is_org_member(user["id"], project["organization"])
)
labels = {"name": "new name"}
response = patch_method(
user["username"],
f'/projects/{project["id"]}',
{"labels": [labels]},
org_id=project["organization"],
)
assert response.status_code == HTTPStatus.OK
assert len(response.json()["labels"]) == len(project["labels"]) + 1
# Org supervisor cannot add label
def test_org_supervisor_can_add_label(
self, find_users, projects, is_project_staff, is_org_member
):
users = find_users(role="supervisor", exclude_privilege="admin")
user, project = next(
(user, project)
for user, project in product(users, projects)
if not is_project_staff(user["id"], project["id"])
and project["organization"]
and is_org_member(user["id"], project["organization"])
)
labels = {"name": "new name"}
response = patch_method(
user["username"],
f'/projects/{project["id"]}',
{"labels": [labels]},
org_id=project["organization"],
)
assert response.status_code == HTTPStatus.FORBIDDEN
# Org worker cannot add label
def test_org_worker_cannot_add_label(
self, find_users, projects, is_project_staff, is_org_member
):
users = find_users(role="worker", exclude_privilege="admin")
user, project = next(
(user, project)
for user, project in product(users, projects)
if not is_project_staff(user["id"], project["id"])
and project["organization"]
and is_org_member(user["id"], project["organization"])
)
labels = {"name": "new name"}
response = patch_method(
user["username"],
f'/projects/{project["id"]}',
{"labels": [labels]},
org_id=project["organization"],
)
assert response.status_code == HTTPStatus.FORBIDDEN
# Org worker that in [project:owner, project:assignee] can add label
def test_org_worker_can_add_label(self, find_users, projects, is_project_staff, is_org_member):
users = find_users(role="worker", exclude_privilege="admin")
user, project = next(
(user, project)
for user, project in product(users, projects)
if is_project_staff(user["id"], project["id"])
and project["organization"]
and is_org_member(user["id"], project["organization"])
)
labels = {"name": "new name"}
response = patch_method(
user["username"],
f'/projects/{project["id"]}',
{"labels": [labels]},
org_id=project["organization"],
)
assert response.status_code == HTTPStatus.OK
assert len(response.json()["labels"]) == len(project["labels"]) + 1
# Org owner can add label even he is not in [project:owner, project:assignee]
def test_org_owner_can_add_label(self, find_users, projects, is_project_staff, is_org_member):
users = find_users(role="owner", exclude_privilege="admin")
user, project = next(
(user, project)
for user, project in product(users, projects)
if not is_project_staff(user["id"], project["id"])
and project["organization"]
and is_org_member(user["id"], project["organization"])
)
labels = {"name": "new name"}
response = patch_method(
user["username"],
f'/projects/{project["id"]}',
{"labels": [labels]},
org_id=project["organization"],
)
assert response.status_code == HTTPStatus.OK
assert len(response.json()["labels"]) == len(project["labels"]) + 1