You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
889 lines
34 KiB
Python
889 lines
34 KiB
Python
# Copyright (C) 2022 Intel Corporation
|
|
# Copyright (C) 2022-2023 CVAT.ai Corporation
|
|
#
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
import io
|
|
import json
|
|
import xml.etree.ElementTree as ET
|
|
import zipfile
|
|
from copy import deepcopy
|
|
from http import HTTPStatus
|
|
from io import BytesIO
|
|
from itertools import product
|
|
from time import sleep
|
|
from typing import Dict, Optional
|
|
|
|
import pytest
|
|
from cvat_sdk.api_client import ApiClient, Configuration, models
|
|
from cvat_sdk.api_client.api_client import Endpoint
|
|
from deepdiff import DeepDiff
|
|
from PIL import Image
|
|
|
|
from shared.utils.config import BASE_URL, USER_PASS, get_method, make_api_client, patch_method
|
|
|
|
from .utils import CollectionSimpleFilterTestBase, export_dataset
|
|
|
|
|
|
@pytest.mark.usefixtures("restore_db_per_class")
|
|
class TestGetProjects:
|
|
def _find_project_by_user_org(self, user, projects, is_project_staff_flag, is_project_staff):
|
|
for p in projects:
|
|
if is_project_staff(user["id"], p["id"]) == is_project_staff_flag:
|
|
return p["id"]
|
|
|
|
def _test_response_200(self, username, project_id, **kwargs):
|
|
with make_api_client(username) as api_client:
|
|
(project, response) = api_client.projects_api.retrieve(project_id, **kwargs)
|
|
assert response.status == HTTPStatus.OK
|
|
assert project_id == project.id
|
|
|
|
def _test_response_403(self, username, project_id):
|
|
with make_api_client(username) as api_client:
|
|
(_, response) = api_client.projects_api.retrieve(
|
|
project_id, _parse_response=False, _check_status=False
|
|
)
|
|
assert response.status == HTTPStatus.FORBIDDEN
|
|
|
|
# Admin can see any project even he has no ownerships for this project.
|
|
def test_project_admin_accessibility(self, projects, find_users, is_project_staff, org_staff):
|
|
users = find_users(privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if not is_project_staff(user["id"], project["organization"])
|
|
and user["id"] not in org_staff(project["organization"])
|
|
)
|
|
self._test_response_200(user["username"], project["id"])
|
|
|
|
# Project owner or project assignee can see project.
|
|
def test_project_owner_accessibility(self, projects):
|
|
for p in projects:
|
|
if p["owner"] is not None:
|
|
project_with_owner = p
|
|
if p["assignee"] is not None:
|
|
project_with_assignee = p
|
|
|
|
assert project_with_owner is not None
|
|
assert project_with_assignee is not None
|
|
|
|
self._test_response_200(project_with_owner["owner"]["username"], project_with_owner["id"])
|
|
self._test_response_200(
|
|
project_with_assignee["assignee"]["username"], project_with_assignee["id"]
|
|
)
|
|
|
|
def test_user_cannot_see_project(self, projects, find_users, is_project_staff, org_staff):
|
|
users = find_users(exclude_privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if not is_project_staff(user["id"], project["organization"])
|
|
and user["id"] not in org_staff(project["organization"])
|
|
)
|
|
self._test_response_403(user["username"], project["id"])
|
|
|
|
@pytest.mark.parametrize("role", ("supervisor", "worker"))
|
|
def test_if_supervisor_or_worker_cannot_see_project(
|
|
self, projects, is_project_staff, find_users, role
|
|
):
|
|
user, pid = next(
|
|
(
|
|
(user, project["id"])
|
|
for user in find_users(role=role, exclude_privilege="admin")
|
|
for project in projects
|
|
if project["organization"] == user["org"]
|
|
and not is_project_staff(user["id"], project["id"])
|
|
)
|
|
)
|
|
|
|
self._test_response_403(user["username"], pid)
|
|
|
|
@pytest.mark.parametrize("role", ("maintainer", "owner"))
|
|
def test_if_maintainer_or_owner_can_see_project(
|
|
self, find_users, projects, is_project_staff, role
|
|
):
|
|
user, pid = next(
|
|
(
|
|
(user, project["id"])
|
|
for user in find_users(role=role, exclude_privilege="admin")
|
|
for project in projects
|
|
if project["organization"] == user["org"]
|
|
and not is_project_staff(user["id"], project["id"])
|
|
)
|
|
)
|
|
|
|
self._test_response_200(user["username"], pid, org_id=user["org"])
|
|
|
|
@pytest.mark.parametrize("role", ("supervisor", "worker"))
|
|
def test_if_org_member_supervisor_or_worker_can_see_project(
|
|
self, projects, find_users, is_project_staff, role
|
|
):
|
|
user, pid = next(
|
|
(
|
|
(user, project["id"])
|
|
for user in find_users(role=role, exclude_privilege="admin")
|
|
for project in projects
|
|
if project["organization"] == user["org"]
|
|
and is_project_staff(user["id"], project["id"])
|
|
)
|
|
)
|
|
|
|
self._test_response_200(user["username"], pid, org_id=user["org"])
|
|
|
|
|
|
class TestProjectsListFilters(CollectionSimpleFilterTestBase):
|
|
field_lookups = {
|
|
"owner": ["owner", "username"],
|
|
"assignee": ["assignee", "username"],
|
|
}
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup(self, restore_db_per_class, admin_user, projects):
|
|
self.user = admin_user
|
|
self.samples = projects
|
|
|
|
def _get_endpoint(self, api_client: ApiClient) -> Endpoint:
|
|
return api_client.projects_api.list_endpoint
|
|
|
|
@pytest.mark.parametrize(
|
|
"field",
|
|
(
|
|
"name",
|
|
"owner",
|
|
"assignee",
|
|
"status",
|
|
),
|
|
)
|
|
def test_can_use_simple_filter_for_object_list(self, field):
|
|
return super().test_can_use_simple_filter_for_object_list(field)
|
|
|
|
|
|
class TestGetProjectBackup:
|
|
def _test_can_get_project_backup(self, username, pid, **kwargs):
|
|
for _ in range(30):
|
|
response = get_method(username, f"projects/{pid}/backup", **kwargs)
|
|
response.raise_for_status()
|
|
if response.status_code == HTTPStatus.CREATED:
|
|
break
|
|
sleep(1)
|
|
response = get_method(username, f"projects/{pid}/backup", action="download", **kwargs)
|
|
assert response.status_code == HTTPStatus.OK
|
|
|
|
def _test_cannot_get_project_backup(self, username, pid, **kwargs):
|
|
response = get_method(username, f"projects/{pid}/backup", **kwargs)
|
|
assert response.status_code == HTTPStatus.FORBIDDEN
|
|
|
|
def test_admin_can_get_project_backup(self, projects):
|
|
project = list(projects)[0]
|
|
self._test_can_get_project_backup("admin1", project["id"])
|
|
|
|
# User that not in [project:owner, project:assignee] cannot get project backup.
|
|
def test_user_cannot_get_project_backup(self, find_users, projects, is_project_staff):
|
|
users = find_users(exclude_privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if not is_project_staff(user["id"], project["id"])
|
|
)
|
|
|
|
self._test_cannot_get_project_backup(user["username"], project["id"])
|
|
|
|
# Org worker that not in [project:owner, project:assignee] cannot get project backup.
|
|
def test_org_worker_cannot_get_project_backup(
|
|
self, find_users, projects, is_project_staff, is_org_member
|
|
):
|
|
users = find_users(role="worker", exclude_privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if not is_project_staff(user["id"], project["id"])
|
|
and project["organization"]
|
|
and is_org_member(user["id"], project["organization"])
|
|
)
|
|
|
|
self._test_cannot_get_project_backup(
|
|
user["username"], project["id"], org_id=project["organization"]
|
|
)
|
|
|
|
# Org worker that in [project:owner, project:assignee] can get project backup.
|
|
def test_org_worker_can_get_project_backup(
|
|
self, find_users, projects, is_project_staff, is_org_member
|
|
):
|
|
users = find_users(role="worker", exclude_privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if is_project_staff(user["id"], project["id"])
|
|
and project["organization"]
|
|
and is_org_member(user["id"], project["organization"])
|
|
)
|
|
|
|
self._test_can_get_project_backup(
|
|
user["username"], project["id"], org_id=project["organization"]
|
|
)
|
|
|
|
# Org supervisor that in [project:owner, project:assignee] can get project backup.
|
|
def test_org_supervisor_can_get_project_backup(
|
|
self, find_users, projects, is_project_staff, is_org_member
|
|
):
|
|
users = find_users(role="supervisor", exclude_privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if is_project_staff(user["id"], project["id"])
|
|
and project["organization"]
|
|
and is_org_member(user["id"], project["organization"])
|
|
)
|
|
|
|
self._test_can_get_project_backup(
|
|
user["username"], project["id"], org_id=project["organization"]
|
|
)
|
|
|
|
# Org supervisor that not in [project:owner, project:assignee] cannot get project backup.
|
|
def test_org_supervisor_cannot_get_project_backup(
|
|
self, find_users, projects, is_project_staff, is_org_member
|
|
):
|
|
users = find_users(role="supervisor", exclude_privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if not is_project_staff(user["id"], project["id"])
|
|
and project["organization"]
|
|
and is_org_member(user["id"], project["organization"])
|
|
)
|
|
|
|
self._test_cannot_get_project_backup(
|
|
user["username"], project["id"], org_id=project["organization"]
|
|
)
|
|
|
|
# Org maintainer that not in [project:owner, project:assignee] can get project backup.
|
|
def test_org_maintainer_can_get_project_backup(
|
|
self, find_users, projects, is_project_staff, is_org_member
|
|
):
|
|
users = find_users(role="maintainer", exclude_privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if not is_project_staff(user["id"], project["id"])
|
|
and project["organization"]
|
|
and is_org_member(user["id"], project["organization"])
|
|
)
|
|
|
|
self._test_can_get_project_backup(
|
|
user["username"], project["id"], org_id=project["organization"]
|
|
)
|
|
|
|
# Org owner that not in [project:owner, project:assignee] can get project backup.
|
|
def test_org_owner_can_get_project_backup(
|
|
self, find_users, projects, is_project_staff, is_org_member
|
|
):
|
|
users = find_users(role="owner", exclude_privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if not is_project_staff(user["id"], project["id"])
|
|
and project["organization"]
|
|
and is_org_member(user["id"], project["organization"])
|
|
)
|
|
|
|
self._test_can_get_project_backup(
|
|
user["username"], project["id"], org_id=project["organization"]
|
|
)
|
|
|
|
|
|
@pytest.mark.usefixtures("restore_db_per_function")
|
|
class TestPostProjects:
|
|
def _test_create_project_201(self, user, spec, **kwargs):
|
|
with make_api_client(user) as api_client:
|
|
(_, response) = api_client.projects_api.create(spec, **kwargs)
|
|
assert response.status == HTTPStatus.CREATED
|
|
|
|
def _test_create_project_403(self, user, spec, **kwargs):
|
|
with make_api_client(user) as api_client:
|
|
(_, response) = api_client.projects_api.create(
|
|
spec, **kwargs, _parse_response=False, _check_status=False
|
|
)
|
|
assert response.status == HTTPStatus.FORBIDDEN
|
|
|
|
def test_if_worker_cannot_create_project(self, find_users):
|
|
workers = find_users(privilege="worker")
|
|
assert len(workers)
|
|
|
|
username = workers[0]["username"]
|
|
spec = {"name": f"test {username} tries to create a project"}
|
|
self._test_create_project_403(username, spec)
|
|
|
|
@pytest.mark.parametrize("privilege", ("admin", "business", "user"))
|
|
def test_if_user_can_create_project(self, find_users, privilege):
|
|
privileged_users = find_users(privilege=privilege)
|
|
assert len(privileged_users)
|
|
|
|
username = privileged_users[0]["username"]
|
|
spec = {"name": f"test {username} tries to create a project"}
|
|
self._test_create_project_201(username, spec)
|
|
|
|
def test_if_org_worker_cannot_create_project(self, find_users):
|
|
workers = find_users(role="worker")
|
|
|
|
worker = next(u for u in workers if u["org"])
|
|
|
|
spec = {
|
|
"name": f'test: worker {worker["username"]} creating a project for his organization',
|
|
}
|
|
self._test_create_project_403(worker["username"], spec, org_id=worker["org"])
|
|
|
|
@pytest.mark.parametrize("role", ("supervisor", "maintainer", "owner"))
|
|
def test_if_org_role_can_create_project(self, role, admin_user):
|
|
# We can hit org or user limits here, so we create a new org and users
|
|
user = self._create_user(
|
|
ApiClient(configuration=Configuration(BASE_URL)), email="test_org_roles@localhost"
|
|
)
|
|
|
|
if role != "owner":
|
|
org = self._create_org(make_api_client(admin_user), members={user["email"]: role})
|
|
else:
|
|
org = self._create_org(make_api_client(user["username"]))
|
|
|
|
spec = {
|
|
"name": f'test: worker {user["username"]} creating a project for his organization',
|
|
}
|
|
self._test_create_project_201(user["username"], spec, org_id=org)
|
|
|
|
@classmethod
|
|
def _create_user(cls, api_client: ApiClient, email: str) -> str:
|
|
username = email.split("@", maxsplit=1)[0]
|
|
with api_client:
|
|
(_, response) = api_client.auth_api.create_register(
|
|
models.RegisterSerializerExRequest(
|
|
username=username, password1=USER_PASS, password2=USER_PASS, email=email
|
|
)
|
|
)
|
|
|
|
api_client.cookies.clear()
|
|
|
|
return json.loads(response.data)
|
|
|
|
@classmethod
|
|
def _create_org(cls, api_client: ApiClient, members: Optional[Dict[str, str]] = None) -> str:
|
|
with api_client:
|
|
(_, response) = api_client.organizations_api.create(
|
|
models.OrganizationWriteRequest(slug="test_org_roles"), _parse_response=False
|
|
)
|
|
org = json.loads(response.data)["id"]
|
|
|
|
for email, role in (members or {}).items():
|
|
api_client.invitations_api.create(
|
|
models.InvitationWriteRequest(role=role, email=email),
|
|
org_id=org,
|
|
_parse_response=False,
|
|
)
|
|
|
|
return org
|
|
|
|
|
|
def _check_cvat_for_video_project_annotations_meta(content, values_to_be_checked):
|
|
document = ET.fromstring(content)
|
|
instance = list(document.find("meta"))[0]
|
|
assert instance.tag == "project"
|
|
assert instance.find("id").text == values_to_be_checked["pid"]
|
|
assert len(list(document.iter("task"))) == len(values_to_be_checked["tasks"])
|
|
tasks = document.iter("task")
|
|
for task_checking in values_to_be_checked["tasks"]:
|
|
task_meta = next(tasks)
|
|
assert task_meta.find("id").text == str(task_checking["id"])
|
|
assert task_meta.find("name").text == task_checking["name"]
|
|
assert task_meta.find("size").text == str(task_checking["size"])
|
|
assert task_meta.find("mode").text == task_checking["mode"]
|
|
assert task_meta.find("source").text
|
|
|
|
|
|
@pytest.mark.usefixtures("restore_db_per_function")
|
|
class TestImportExportDatasetProject:
|
|
def _test_export_project(self, username, pid, format_name):
|
|
with make_api_client(username) as api_client:
|
|
return export_dataset(
|
|
api_client.projects_api.retrieve_dataset_endpoint, id=pid, format=format_name
|
|
)
|
|
|
|
def _export_annotations(self, username, pid, format_name):
|
|
with make_api_client(username) as api_client:
|
|
return export_dataset(
|
|
api_client.projects_api.retrieve_annotations_endpoint, id=pid, format=format_name
|
|
)
|
|
|
|
def _test_import_project(self, username, project_id, format_name, data):
|
|
with make_api_client(username) as api_client:
|
|
(_, response) = api_client.projects_api.create_dataset(
|
|
id=project_id,
|
|
format=format_name,
|
|
dataset_write_request=deepcopy(data),
|
|
_content_type="multipart/form-data",
|
|
)
|
|
assert response.status == HTTPStatus.ACCEPTED
|
|
|
|
while True:
|
|
# TODO: It's better be refactored to a separate endpoint to get request status
|
|
(_, response) = api_client.projects_api.retrieve_dataset(
|
|
project_id, action="import_status"
|
|
)
|
|
if response.status == HTTPStatus.CREATED:
|
|
break
|
|
|
|
def _test_get_annotations_from_task(self, username, task_id):
|
|
with make_api_client(username) as api_client:
|
|
(_, response) = api_client.tasks_api.retrieve_annotations(task_id)
|
|
assert response.status == HTTPStatus.OK
|
|
|
|
response_data = json.loads(response.data)
|
|
return response_data
|
|
|
|
def test_can_import_dataset_in_org(self, admin_user):
|
|
project_id = 4
|
|
|
|
response = self._test_export_project(admin_user, project_id, "CVAT for images 1.1")
|
|
|
|
tmp_file = io.BytesIO(response.data)
|
|
tmp_file.name = "dataset.zip"
|
|
|
|
import_data = {
|
|
"dataset_file": tmp_file,
|
|
}
|
|
|
|
self._test_import_project(admin_user, project_id, "CVAT 1.1", import_data)
|
|
|
|
def test_can_export_and_import_dataset_with_skeletons_coco_keypoints(self, admin_user):
|
|
project_id = 5
|
|
|
|
response = self._test_export_project(admin_user, project_id, "COCO Keypoints 1.0")
|
|
|
|
tmp_file = io.BytesIO(response.data)
|
|
tmp_file.name = "dataset.zip"
|
|
import_data = {
|
|
"dataset_file": tmp_file,
|
|
}
|
|
|
|
self._test_import_project(admin_user, project_id, "COCO Keypoints 1.0", import_data)
|
|
|
|
def test_can_export_and_import_dataset_with_skeletons_cvat_for_images(self, admin_user):
|
|
project_id = 5
|
|
|
|
response = self._test_export_project(admin_user, project_id, "CVAT for images 1.1")
|
|
|
|
tmp_file = io.BytesIO(response.data)
|
|
tmp_file.name = "dataset.zip"
|
|
import_data = {
|
|
"dataset_file": tmp_file,
|
|
}
|
|
|
|
self._test_import_project(admin_user, project_id, "CVAT 1.1", import_data)
|
|
|
|
def test_can_export_and_import_dataset_with_skeletons_cvat_for_video(self, admin_user):
|
|
project_id = 5
|
|
|
|
response = self._test_export_project(admin_user, project_id, "CVAT for video 1.1")
|
|
|
|
tmp_file = io.BytesIO(response.data)
|
|
tmp_file.name = "dataset.zip"
|
|
import_data = {
|
|
"dataset_file": tmp_file,
|
|
}
|
|
|
|
self._test_import_project(admin_user, project_id, "CVAT 1.1", import_data)
|
|
|
|
def _test_can_get_project_backup(self, username, pid, **kwargs):
|
|
for _ in range(30):
|
|
response = get_method(username, f"projects/{pid}/backup", **kwargs)
|
|
response.raise_for_status()
|
|
if response.status_code == HTTPStatus.CREATED:
|
|
break
|
|
sleep(1)
|
|
response = get_method(username, f"projects/{pid}/backup", action="download", **kwargs)
|
|
assert response.status_code == HTTPStatus.OK
|
|
return response
|
|
|
|
def test_admin_can_get_project_backup_and_create_project_by_backup(self, admin_user):
|
|
project_id = 5
|
|
response = self._test_can_get_project_backup(admin_user, project_id)
|
|
|
|
tmp_file = io.BytesIO(response.content)
|
|
tmp_file.name = "dataset.zip"
|
|
|
|
import_data = {
|
|
"project_file": tmp_file,
|
|
}
|
|
|
|
with make_api_client(admin_user) as api_client:
|
|
(_, response) = api_client.projects_api.create_backup(
|
|
backup_write_request=deepcopy(import_data), _content_type="multipart/form-data"
|
|
)
|
|
assert response.status == HTTPStatus.ACCEPTED
|
|
|
|
@pytest.mark.parametrize("format_name", ("Datumaro 1.0", "ImageNet 1.0", "PASCAL VOC 1.1"))
|
|
def test_can_import_export_dataset_with_some_format(self, format_name):
|
|
# https://github.com/opencv/cvat/issues/4410
|
|
# https://github.com/opencv/cvat/issues/4850
|
|
# https://github.com/opencv/cvat/issues/4621
|
|
username = "admin1"
|
|
project_id = 4
|
|
|
|
response = self._test_export_project(username, project_id, format_name)
|
|
|
|
tmp_file = io.BytesIO(response.data)
|
|
tmp_file.name = "dataset.zip"
|
|
|
|
import_data = {
|
|
"dataset_file": tmp_file,
|
|
}
|
|
|
|
self._test_import_project(username, project_id, format_name, import_data)
|
|
|
|
@pytest.mark.parametrize("username, pid", [("admin1", 8)])
|
|
@pytest.mark.parametrize(
|
|
"anno_format, anno_file_name, check_func",
|
|
[
|
|
(
|
|
"CVAT for video 1.1",
|
|
"annotations.xml",
|
|
_check_cvat_for_video_project_annotations_meta,
|
|
),
|
|
],
|
|
)
|
|
def test_exported_project_dataset_structure(
|
|
self,
|
|
username,
|
|
pid,
|
|
anno_format,
|
|
anno_file_name,
|
|
check_func,
|
|
tasks,
|
|
projects,
|
|
annotations,
|
|
):
|
|
project = projects[pid]
|
|
|
|
values_to_be_checked = {
|
|
"pid": str(pid),
|
|
"name": project["name"],
|
|
"tasks": [
|
|
{
|
|
"id": task["id"],
|
|
"name": task["name"],
|
|
"size": str(task["size"]),
|
|
"mode": task["mode"],
|
|
}
|
|
for task in tasks
|
|
if task["project_id"] == project["id"]
|
|
],
|
|
}
|
|
|
|
response = self._export_annotations(username, pid, anno_format)
|
|
assert response.data
|
|
with zipfile.ZipFile(BytesIO(response.data)) as zip_file:
|
|
content = zip_file.read(anno_file_name)
|
|
check_func(content, values_to_be_checked)
|
|
|
|
def test_can_import_export_annotations_with_rotation(self):
|
|
# https://github.com/opencv/cvat/issues/4378
|
|
username = "admin1"
|
|
project_id = 4
|
|
|
|
response = self._test_export_project(username, project_id, "CVAT for images 1.1")
|
|
|
|
tmp_file = io.BytesIO(response.data)
|
|
tmp_file.name = "dataset.zip"
|
|
|
|
import_data = {
|
|
"dataset_file": tmp_file,
|
|
}
|
|
|
|
self._test_import_project(username, project_id, "CVAT 1.1", import_data)
|
|
|
|
response = get_method(username, f"/tasks", project_id=project_id)
|
|
assert response.status_code == HTTPStatus.OK
|
|
tasks = response.json()["results"]
|
|
|
|
response_data = self._test_get_annotations_from_task(username, tasks[0]["id"])
|
|
task1_rotation = response_data["shapes"][0]["rotation"]
|
|
response_data = self._test_get_annotations_from_task(username, tasks[1]["id"])
|
|
task2_rotation = response_data["shapes"][0]["rotation"]
|
|
|
|
assert task1_rotation == task2_rotation
|
|
|
|
|
|
@pytest.mark.usefixtures("restore_db_per_function")
|
|
class TestPatchProjectLabel:
|
|
def test_admin_can_delete_label(self, projects):
|
|
project = deepcopy(list(projects)[1])
|
|
labels = project["labels"][0]
|
|
labels.update({"deleted": True})
|
|
response = patch_method("admin1", f'/projects/{project["id"]}', {"labels": [labels]})
|
|
assert response.status_code == HTTPStatus.OK
|
|
assert len(response.json()["labels"]) == len(project["labels"]) - 1
|
|
|
|
def test_admin_can_delete_skeleton_label(self, projects):
|
|
project = deepcopy(projects[5])
|
|
labels = project["labels"][0]
|
|
labels.update({"deleted": True})
|
|
response = patch_method("admin1", f'/projects/{project["id"]}', {"labels": [labels]})
|
|
assert response.status_code == HTTPStatus.OK
|
|
assert len(response.json()["labels"]) == len(project["labels"]) - 4
|
|
|
|
def test_admin_can_rename_label(self, projects):
|
|
project = deepcopy(list(projects)[0])
|
|
labels = project["labels"][0]
|
|
labels.update({"name": "new name"})
|
|
response = patch_method("admin1", f'/projects/{project["id"]}', {"labels": [labels]})
|
|
assert response.status_code == HTTPStatus.OK
|
|
assert DeepDiff(response.json()["labels"], project["labels"], ignore_order=True) == {}
|
|
|
|
def test_admin_can_add_label(self, projects):
|
|
project = list(projects)[0]
|
|
labels = {"name": "new name"}
|
|
response = patch_method("admin1", f'/projects/{project["id"]}', {"labels": [labels]})
|
|
assert response.status_code == HTTPStatus.OK
|
|
assert len(response.json()["labels"]) == len(project["labels"]) + 1
|
|
|
|
# Org maintainer can add label even he is not in [project:owner, project:assignee]
|
|
def test_org_maintainer_can_add_label(
|
|
self, find_users, projects, is_project_staff, is_org_member
|
|
):
|
|
users = find_users(role="maintainer", exclude_privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if not is_project_staff(user["id"], project["id"])
|
|
and project["organization"]
|
|
and is_org_member(user["id"], project["organization"])
|
|
)
|
|
|
|
labels = {"name": "new name"}
|
|
response = patch_method(
|
|
user["username"],
|
|
f'/projects/{project["id"]}',
|
|
{"labels": [labels]},
|
|
org_id=project["organization"],
|
|
)
|
|
assert response.status_code == HTTPStatus.OK
|
|
assert len(response.json()["labels"]) == len(project["labels"]) + 1
|
|
|
|
# Org supervisor cannot add label
|
|
def test_org_supervisor_can_add_label(
|
|
self, find_users, projects, is_project_staff, is_org_member
|
|
):
|
|
users = find_users(role="supervisor", exclude_privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if not is_project_staff(user["id"], project["id"])
|
|
and project["organization"]
|
|
and is_org_member(user["id"], project["organization"])
|
|
)
|
|
|
|
labels = {"name": "new name"}
|
|
response = patch_method(
|
|
user["username"],
|
|
f'/projects/{project["id"]}',
|
|
{"labels": [labels]},
|
|
org_id=project["organization"],
|
|
)
|
|
assert response.status_code == HTTPStatus.FORBIDDEN
|
|
|
|
# Org worker cannot add label
|
|
def test_org_worker_cannot_add_label(
|
|
self, find_users, projects, is_project_staff, is_org_member
|
|
):
|
|
users = find_users(role="worker", exclude_privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if not is_project_staff(user["id"], project["id"])
|
|
and project["organization"]
|
|
and is_org_member(user["id"], project["organization"])
|
|
)
|
|
|
|
labels = {"name": "new name"}
|
|
response = patch_method(
|
|
user["username"],
|
|
f'/projects/{project["id"]}',
|
|
{"labels": [labels]},
|
|
org_id=project["organization"],
|
|
)
|
|
assert response.status_code == HTTPStatus.FORBIDDEN
|
|
|
|
# Org worker that in [project:owner, project:assignee] can add label
|
|
def test_org_worker_can_add_label(self, find_users, projects, is_project_staff, is_org_member):
|
|
users = find_users(role="worker", exclude_privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if is_project_staff(user["id"], project["id"])
|
|
and project["organization"]
|
|
and is_org_member(user["id"], project["organization"])
|
|
)
|
|
|
|
labels = {"name": "new name"}
|
|
response = patch_method(
|
|
user["username"],
|
|
f'/projects/{project["id"]}',
|
|
{"labels": [labels]},
|
|
org_id=project["organization"],
|
|
)
|
|
assert response.status_code == HTTPStatus.OK
|
|
assert len(response.json()["labels"]) == len(project["labels"]) + 1
|
|
|
|
# Org owner can add label even he is not in [project:owner, project:assignee]
|
|
def test_org_owner_can_add_label(self, find_users, projects, is_project_staff, is_org_member):
|
|
users = find_users(role="owner", exclude_privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if not is_project_staff(user["id"], project["id"])
|
|
and project["organization"]
|
|
and is_org_member(user["id"], project["organization"])
|
|
)
|
|
|
|
labels = {"name": "new name"}
|
|
response = patch_method(
|
|
user["username"],
|
|
f'/projects/{project["id"]}',
|
|
{"labels": [labels]},
|
|
org_id=project["organization"],
|
|
)
|
|
assert response.status_code == HTTPStatus.OK
|
|
assert len(response.json()["labels"]) == len(project["labels"]) + 1
|
|
|
|
|
|
@pytest.mark.usefixtures("restore_db_per_class")
|
|
class TestGetProjectPreview:
|
|
def _test_response_200(self, username, project_id, **kwargs):
|
|
with make_api_client(username) as api_client:
|
|
(_, response) = api_client.projects_api.retrieve_preview(project_id, **kwargs)
|
|
|
|
assert response.status == HTTPStatus.OK
|
|
(width, height) = Image.open(BytesIO(response.data)).size
|
|
assert width > 0 and height > 0
|
|
|
|
def _test_response_403(self, username, project_id):
|
|
with make_api_client(username) as api_client:
|
|
(_, response) = api_client.projects_api.retrieve_preview(
|
|
project_id, _parse_response=False, _check_status=False
|
|
)
|
|
assert response.status == HTTPStatus.FORBIDDEN
|
|
|
|
def _test_response_404(self, username, project_id):
|
|
with make_api_client(username) as api_client:
|
|
(_, response) = api_client.projects_api.retrieve_preview(
|
|
project_id, _parse_response=False, _check_status=False
|
|
)
|
|
assert response.status == HTTPStatus.NOT_FOUND
|
|
|
|
# Admin can see any project preview even he has no ownerships for this project.
|
|
def test_project_preview_admin_accessibility(
|
|
self, projects, find_users, is_project_staff, org_staff
|
|
):
|
|
users = find_users(privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if not is_project_staff(user["id"], project["organization"])
|
|
and user["id"] not in org_staff(project["organization"])
|
|
and project["tasks"]
|
|
)
|
|
self._test_response_200(user["username"], project["id"])
|
|
|
|
# Project owner or project assignee can see project preview.
|
|
def test_project_preview_owner_accessibility(self, projects):
|
|
for p in projects:
|
|
if not p["tasks"]:
|
|
continue
|
|
if p["owner"] is not None:
|
|
project_with_owner = p
|
|
if p["assignee"] is not None:
|
|
project_with_assignee = p
|
|
|
|
assert project_with_owner is not None
|
|
assert project_with_assignee is not None
|
|
|
|
self._test_response_200(project_with_owner["owner"]["username"], project_with_owner["id"])
|
|
self._test_response_200(
|
|
project_with_assignee["assignee"]["username"], project_with_assignee["id"]
|
|
)
|
|
|
|
def test_project_preview_not_found(self, projects, tasks):
|
|
for p in projects:
|
|
if any(t["project_id"] == p["id"] for t in tasks):
|
|
continue
|
|
if p["owner"] is not None:
|
|
project_with_owner = p
|
|
if p["assignee"] is not None:
|
|
project_with_assignee = p
|
|
|
|
assert project_with_owner is not None
|
|
assert project_with_assignee is not None
|
|
|
|
self._test_response_404(project_with_owner["owner"]["username"], project_with_owner["id"])
|
|
self._test_response_404(
|
|
project_with_assignee["assignee"]["username"], project_with_assignee["id"]
|
|
)
|
|
|
|
def test_user_cannot_see_project_preview(
|
|
self, projects, find_users, is_project_staff, org_staff
|
|
):
|
|
users = find_users(exclude_privilege="admin")
|
|
|
|
user, project = next(
|
|
(user, project)
|
|
for user, project in product(users, projects)
|
|
if not is_project_staff(user["id"], project["organization"])
|
|
and user["id"] not in org_staff(project["organization"])
|
|
)
|
|
self._test_response_403(user["username"], project["id"])
|
|
|
|
@pytest.mark.parametrize("role", ("supervisor", "worker"))
|
|
def test_if_supervisor_or_worker_cannot_see_project_preview(
|
|
self, projects, is_project_staff, find_users, role
|
|
):
|
|
user, pid = next(
|
|
(
|
|
(user, project["id"])
|
|
for user in find_users(role=role, exclude_privilege="admin")
|
|
for project in projects
|
|
if project["organization"] == user["org"]
|
|
and not is_project_staff(user["id"], project["id"])
|
|
)
|
|
)
|
|
|
|
self._test_response_403(user["username"], pid)
|
|
|
|
@pytest.mark.parametrize("role", ("maintainer", "owner"))
|
|
def test_if_maintainer_or_owner_can_see_project_preview(
|
|
self, find_users, projects, is_project_staff, role
|
|
):
|
|
user, pid = next(
|
|
(
|
|
(user, project["id"])
|
|
for user in find_users(role=role, exclude_privilege="admin")
|
|
for project in projects
|
|
if project["organization"] == user["org"]
|
|
and not is_project_staff(user["id"], project["id"])
|
|
and project["tasks"]
|
|
)
|
|
)
|
|
|
|
self._test_response_200(user["username"], pid, org_id=user["org"])
|