You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

890 lines
34 KiB
Python

# Copyright (C) 2022 Intel Corporation
# Copyright (C) 2022 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT
import io
import json
import os.path as osp
import subprocess
from copy import deepcopy
from functools import partial
from http import HTTPStatus
from tempfile import TemporaryDirectory
from time import sleep
import pytest
from cvat_sdk.api_client import apis, models
from cvat_sdk.core.helpers import get_paginated_collection
from deepdiff import DeepDiff
from PIL import Image
import shared.utils.s3 as s3
from shared.fixtures.init import get_server_image_tag
from shared.utils.config import get_method, make_api_client, patch_method
from shared.utils.helpers import generate_image_files
from .utils import export_dataset
def get_cloud_storage_content(username, cloud_storage_id, manifest):
with make_api_client(username) as api_client:
(data, _) = api_client.cloudstorages_api.retrieve_content(
cloud_storage_id, manifest_path=manifest
)
return data
@pytest.mark.usefixtures("restore_db_per_class")
class TestGetTasks:
def _test_task_list_200(self, user, project_id, data, exclude_paths="", **kwargs):
with make_api_client(user) as api_client:
results = get_paginated_collection(
api_client.projects_api.list_tasks_endpoint,
return_json=True,
id=project_id,
**kwargs,
)
assert DeepDiff(data, results, ignore_order=True, exclude_paths=exclude_paths) == {}
def _test_task_list_403(self, user, project_id, **kwargs):
with make_api_client(user) as api_client:
(_, response) = api_client.projects_api.list_tasks(
project_id, **kwargs, _parse_response=False, _check_status=False
)
assert response.status == HTTPStatus.FORBIDDEN
def _test_users_to_see_task_list(
self, project_id, tasks, users, is_staff, is_allow, is_project_staff, **kwargs
):
if is_staff:
users = [user for user in users if is_project_staff(user["id"], project_id)]
else:
users = [user for user in users if not is_project_staff(user["id"], project_id)]
assert len(users)
for user in users:
if is_allow:
self._test_task_list_200(user["username"], project_id, tasks, **kwargs)
else:
self._test_task_list_403(user["username"], project_id, **kwargs)
def _test_assigned_users_to_see_task_data(self, tasks, users, is_task_staff, **kwargs):
for task in tasks:
staff_users = [user for user in users if is_task_staff(user["id"], task["id"])]
assert len(staff_users)
for user in staff_users:
with make_api_client(user["username"]) as api_client:
(_, response) = api_client.tasks_api.list(**kwargs)
assert response.status == HTTPStatus.OK
response_data = json.loads(response.data)
assert any(_task["id"] == task["id"] for _task in response_data["results"])
@pytest.mark.parametrize("project_id", [1])
@pytest.mark.parametrize(
"groups, is_staff, is_allow",
[
("admin", False, True),
("business", False, False),
],
)
def test_project_tasks_visibility(
self, project_id, groups, users, tasks, is_staff, is_allow, find_users, is_project_staff
):
users = find_users(privilege=groups)
tasks = list(filter(lambda x: x["project_id"] == project_id, tasks))
assert len(tasks)
self._test_users_to_see_task_list(
project_id, tasks, users, is_staff, is_allow, is_project_staff
)
@pytest.mark.parametrize("project_id, groups", [(1, "user")])
def test_task_assigned_to_see_task(
self, project_id, groups, users, tasks, find_users, is_task_staff
):
users = find_users(privilege=groups)
tasks = list(filter(lambda x: x["project_id"] == project_id and x["assignee"], tasks))
assert len(tasks)
self._test_assigned_users_to_see_task_data(tasks, users, is_task_staff)
@pytest.mark.parametrize("org, project_id", [({"id": 2, "slug": "org2"}, 2)])
@pytest.mark.parametrize(
"role, is_staff, is_allow",
[
("maintainer", False, True),
("supervisor", False, False),
],
)
def test_org_project_tasks_visibility(
self,
org,
project_id,
role,
is_staff,
is_allow,
tasks,
is_task_staff,
is_project_staff,
find_users,
):
users = find_users(org=org["id"], role=role)
tasks = list(filter(lambda x: x["project_id"] == project_id, tasks))
assert len(tasks)
self._test_users_to_see_task_list(
project_id, tasks, users, is_staff, is_allow, is_project_staff, org=org["slug"]
)
@pytest.mark.parametrize("org, project_id, role", [({"id": 2, "slug": "org2"}, 2, "worker")])
def test_org_task_assigneed_to_see_task(
self, org, project_id, role, users, tasks, find_users, is_task_staff
):
users = find_users(org=org["id"], role=role)
tasks = list(filter(lambda x: x["project_id"] == project_id and x["assignee"], tasks))
assert len(tasks)
self._test_assigned_users_to_see_task_data(tasks, users, is_task_staff, org=org["slug"])
@pytest.mark.usefixtures("restore_db_per_function")
class TestPostTasks:
def _test_create_task_201(self, user, spec, **kwargs):
with make_api_client(user) as api_client:
(_, response) = api_client.tasks_api.create(spec, **kwargs)
assert response.status == HTTPStatus.CREATED
def _test_create_task_403(self, user, spec, **kwargs):
with make_api_client(user) as api_client:
(_, response) = api_client.tasks_api.create(
spec, **kwargs, _parse_response=False, _check_status=False
)
assert response.status == HTTPStatus.FORBIDDEN
def _test_users_to_create_task_in_project(
self, project_id, users, is_staff, is_allow, is_project_staff, **kwargs
):
if is_staff:
users = [user for user in users if is_project_staff(user["id"], project_id)]
else:
users = [user for user in users if not is_project_staff(user["id"], project_id)]
assert len(users)
for user in users:
username = user["username"]
spec = {
"name": f"test {username} to create a task within a project",
"project_id": project_id,
}
if is_allow:
self._test_create_task_201(username, spec, **kwargs)
else:
self._test_create_task_403(username, spec, **kwargs)
@pytest.mark.parametrize("project_id", [1])
@pytest.mark.parametrize(
"groups, is_staff, is_allow",
[
("admin", False, True),
("business", False, False),
("user", True, True),
],
)
def test_users_to_create_task_in_project(
self, project_id, groups, is_staff, is_allow, is_project_staff, find_users
):
users = find_users(privilege=groups)
self._test_users_to_create_task_in_project(
project_id, users, is_staff, is_allow, is_project_staff
)
@pytest.mark.parametrize("org, project_id", [({"id": 2, "slug": "org2"}, 2)])
@pytest.mark.parametrize(
"role, is_staff, is_allow",
[
("worker", False, False),
],
)
def test_worker_cannot_create_task_in_project_without_ownership(
self, org, project_id, role, is_staff, is_allow, is_project_staff, find_users
):
users = find_users(org=org["id"], role=role)
self._test_users_to_create_task_in_project(
project_id, users, is_staff, is_allow, is_project_staff, org=org["slug"]
)
def test_can_create_task_with_skeleton(self):
username = "admin1"
spec = {
"name": f"test admin1 to create a task with skeleton",
"labels": [
{
"name": "s1",
"color": "#5c5eba",
"attributes": [
{
"name": "color",
"mutable": False,
"input_type": "select",
"default_value": "white",
"values": ["white", "black"],
}
],
"type": "skeleton",
"sublabels": [
{
"name": "1",
"color": "#d53957",
"attributes": [
{
"id": 23,
"name": "attr",
"mutable": False,
"input_type": "select",
"default_value": "val1",
"values": ["val1", "val2"],
}
],
"type": "points",
},
{"name": "2", "color": "#4925ec", "attributes": [], "type": "points"},
{"name": "3", "color": "#59a8fe", "attributes": [], "type": "points"},
],
"svg": '<line x1="36.329429626464844" y1="45.98662185668945" x2="59.07190704345703" y2="23.076923370361328" '
'stroke="black" data-type="edge" data-node-from="2" stroke-width="0.5" data-node-to="3"></line>'
'<line x1="22.61705780029297" y1="25.75250816345215" x2="36.329429626464844" y2="45.98662185668945" '
'stroke="black" data-type="edge" data-node-from="1" stroke-width="0.5" data-node-to="2"></line>'
'<circle r="1.5" stroke="black" fill="#b3b3b3" cx="22.61705780029297" cy="25.75250816345215" '
'stroke-width="0.1" data-type="element node" data-element-id="1" data-node-id="1" data-label-name="1">'
'</circle><circle r="1.5" stroke="black" fill="#b3b3b3" cx="36.329429626464844" cy="45.98662185668945" '
'stroke-width="0.1" data-type="element node" data-element-id="2" data-node-id="2" data-label-name="2"></circle>'
'<circle r="1.5" stroke="black" fill="#b3b3b3" cx="59.07190704345703" cy="23.076923370361328" '
'stroke-width="0.1" data-type="element node" data-element-id="3" data-node-id="3" data-label-name="3"></circle>',
}
],
}
self._test_create_task_201(username, spec)
@pytest.mark.usefixtures("restore_db_per_class")
class TestGetData:
_USERNAME = "user1"
@pytest.mark.parametrize(
"content_type, task_id",
[
("image/png", 8),
("image/png", 5),
("image/x.point-cloud-data", 6),
],
)
def test_frame_content_type(self, content_type, task_id):
with make_api_client(self._USERNAME) as api_client:
(_, response) = api_client.tasks_api.retrieve_data(
task_id, type="frame", quality="original", number=0
)
assert response.status == HTTPStatus.OK
assert response.headers["Content-Type"] == content_type
@pytest.mark.usefixtures("restore_db_per_function")
class TestPatchTaskAnnotations:
def _test_check_response(self, is_allow, response, data=None):
if is_allow:
assert response.status == HTTPStatus.OK
assert DeepDiff(data, json.loads(response.data), exclude_paths="root['version']") == {}
else:
assert response.status == HTTPStatus.FORBIDDEN
@pytest.fixture(scope="class")
def request_data(self, annotations):
def get_data(tid):
data = deepcopy(annotations["task"][str(tid)])
data["shapes"][0].update({"points": [2.0, 3.0, 4.0, 5.0, 6.0, 7.0]})
data["version"] += 1
return data
return get_data
@pytest.mark.parametrize("org", [""])
@pytest.mark.parametrize(
"privilege, task_staff, is_allow",
[
("admin", True, True),
("admin", False, True),
("business", True, True),
("business", False, False),
("worker", True, True),
("worker", False, False),
("user", True, True),
("user", False, False),
],
)
def test_user_update_task_annotations(
self,
org,
privilege,
task_staff,
is_allow,
find_task_staff_user,
find_users,
request_data,
tasks_by_org,
filter_tasks_with_shapes,
):
users = find_users(privilege=privilege)
tasks = tasks_by_org[org]
filtered_tasks = filter_tasks_with_shapes(tasks)
username, tid = find_task_staff_user(filtered_tasks, users, task_staff)
data = request_data(tid)
with make_api_client(username) as api_client:
(_, response) = api_client.tasks_api.partial_update_annotations(
id=tid,
action="update",
org=org,
patched_labeled_data_request=deepcopy(data),
_parse_response=False,
_check_status=False,
)
self._test_check_response(is_allow, response, data)
@pytest.mark.parametrize("org", [2])
@pytest.mark.parametrize(
"role, task_staff, is_allow",
[
("maintainer", False, True),
("owner", False, True),
("supervisor", False, False),
("worker", False, False),
("maintainer", True, True),
("owner", True, True),
("supervisor", True, True),
("worker", True, True),
],
)
def test_member_update_task_annotation(
self,
org,
role,
task_staff,
is_allow,
find_task_staff_user,
find_users,
tasks_by_org,
request_data,
):
users = find_users(role=role, org=org)
tasks = tasks_by_org[org]
username, tid = find_task_staff_user(tasks, users, task_staff, [12, 14])
data = request_data(tid)
with make_api_client(username) as api_client:
(_, response) = api_client.tasks_api.partial_update_annotations(
id=tid,
org_id=org,
action="update",
patched_labeled_data_request=deepcopy(data),
_parse_response=False,
_check_status=False,
)
self._test_check_response(is_allow, response, data)
@pytest.mark.usefixtures("restore_db_per_class")
class TestGetTaskDataset:
def _test_export_task(self, username, tid, **kwargs):
with make_api_client(username) as api_client:
return export_dataset(api_client.tasks_api.retrieve_dataset_endpoint, id=tid, **kwargs)
def test_can_export_task_dataset(self, admin_user, tasks_with_shapes):
task = tasks_with_shapes[0]
response = self._test_export_task(admin_user, task["id"], format="CVAT for images 1.1")
assert response.data
@pytest.mark.usefixtures("restore_db_per_function")
@pytest.mark.usefixtures("restore_cvat_data")
class TestPostTaskData:
_USERNAME = "admin1"
@staticmethod
def _wait_until_task_is_created(api: apis.TasksApi, task_id: int) -> models.RqStatus:
for _ in range(100):
(status, _) = api.retrieve_status(task_id)
if status.state.value in ["Finished", "Failed"]:
return status
sleep(1)
raise Exception("Cannot create task")
def _test_create_task(self, username, spec, data, content_type, **kwargs):
with make_api_client(username) as api_client:
(task, response) = api_client.tasks_api.create(spec, **kwargs)
assert response.status == HTTPStatus.CREATED
(_, response) = api_client.tasks_api.create_data(
task.id, data_request=deepcopy(data), _content_type=content_type, **kwargs
)
assert response.status == HTTPStatus.ACCEPTED
status = self._wait_until_task_is_created(api_client.tasks_api, task.id)
assert status.state.value == "Finished"
return task.id
def _test_cannot_create_task(self, username, spec, data, **kwargs):
with make_api_client(username) as api_client:
(task, response) = api_client.tasks_api.create(spec, **kwargs)
assert response.status == HTTPStatus.CREATED
(_, response) = api_client.tasks_api.create_data(
task.id, data_request=deepcopy(data), _content_type="application/json", **kwargs
)
assert response.status == HTTPStatus.ACCEPTED
status = self._wait_until_task_is_created(api_client.tasks_api, task.id)
assert status.state.value == "Failed"
return status
def test_can_create_task_with_defined_start_and_stop_frames(self):
task_spec = {
"name": f"test {self._USERNAME} to create a task with defined start and stop frames",
"labels": [
{
"name": "car",
"color": "#ff00ff",
"attributes": [
{
"name": "a",
"mutable": True,
"input_type": "number",
"default_value": "5",
"values": ["4", "5", "6"],
}
],
}
],
}
task_data = {
"image_quality": 75,
"start_frame": 2,
"stop_frame": 5,
"client_files": generate_image_files(7),
}
task_id = self._test_create_task(
self._USERNAME, task_spec, task_data, content_type="multipart/form-data"
)
# check task size
with make_api_client(self._USERNAME) as api_client:
(task, _) = api_client.tasks_api.retrieve(task_id)
assert task.size == 4
def test_can_get_annotations_from_new_task_with_skeletons(self):
spec = {
"name": f"test admin1 to create a task with skeleton",
"labels": [
{
"name": "s1",
"color": "#5c5eba",
"attributes": [],
"type": "skeleton",
"sublabels": [
{"name": "1", "color": "#d12345", "attributes": [], "type": "points"},
{"name": "2", "color": "#350dea", "attributes": [], "type": "points"},
],
"svg": '<line x1="19.464284896850586" y1="21.922269821166992" x2="54.08613586425781" y2="43.60293960571289" '
'stroke="black" data-type="edge" data-node-from="1" stroke-width="0.5" data-node-to="2"></line>'
'<circle r="1.5" stroke="black" fill="#b3b3b3" cx="19.464284896850586" cy="21.922269821166992" '
'stroke-width="0.1" data-type="element node" data-element-id="1" data-node-id="1" data-label-id="103"></circle>'
'<circle r="1.5" stroke="black" fill="#b3b3b3" cx="54.08613586425781" cy="43.60293960571289" '
'stroke-width="0.1" data-type="element node" data-element-id="2" data-node-id="2" data-label-id="104"></circle>',
}
],
}
task_data = {
"image_quality": 75,
"client_files": generate_image_files(3),
}
task_id = self._test_create_task(
self._USERNAME, spec, task_data, content_type="multipart/form-data"
)
response = get_method(self._USERNAME, f"tasks/{task_id}")
label_ids = {}
for label in response.json()["labels"]:
label_ids.setdefault(label["type"], []).append(label["id"])
job_id = response.json()["segments"][0]["jobs"][0]["id"]
patch_data = {
"shapes": [
{
"type": "skeleton",
"occluded": False,
"outside": False,
"z_order": 0,
"rotation": 0,
"points": [],
"frame": 0,
"label_id": label_ids["skeleton"][0],
"group": 0,
"source": "manual",
"attributes": [],
"elements": [
{
"type": "points",
"occluded": False,
"outside": False,
"z_order": 0,
"rotation": 0,
"points": [131.63947368421032, 165.0868421052637],
"frame": 0,
"label_id": label_ids["points"][0],
"group": 0,
"source": "manual",
"attributes": [],
},
{
"type": "points",
"occluded": False,
"outside": False,
"z_order": 0,
"rotation": 0,
"points": [354.98157894736823, 304.2710526315795],
"frame": 0,
"label_id": label_ids["points"][1],
"group": 0,
"source": "manual",
"attributes": [],
},
],
}
],
"tracks": [
{
"frame": 0,
"label_id": label_ids["skeleton"][0],
"group": 0,
"source": "manual",
"shapes": [
{
"type": "skeleton",
"occluded": False,
"outside": False,
"z_order": 0,
"rotation": 0,
"points": [],
"frame": 0,
"attributes": [],
}
],
"attributes": [],
"elements": [
{
"frame": 0,
"label_id": label_ids["points"][0],
"group": 0,
"source": "manual",
"shapes": [
{
"type": "points",
"occluded": False,
"outside": False,
"z_order": 0,
"rotation": 0,
"points": [295.6394736842103, 472.5868421052637],
"frame": 0,
"attributes": [],
}
],
"attributes": [],
},
{
"frame": 0,
"label_id": label_ids["points"][1],
"group": 0,
"source": "manual",
"shapes": [
{
"type": "points",
"occluded": False,
"outside": False,
"z_order": 0,
"rotation": 0,
"points": [619.3236842105262, 846.9815789473689],
"frame": 0,
"attributes": [],
}
],
"attributes": [],
},
],
}
],
"tags": [],
"version": 0,
}
response = patch_method(
self._USERNAME, f"jobs/{job_id}/annotations", patch_data, action="create"
)
response = get_method(self._USERNAME, f"jobs/{job_id}/annotations")
assert response.status_code == HTTPStatus.OK
@pytest.mark.with_external_services
@pytest.mark.parametrize(
"cloud_storage_id, manifest, use_bucket_content, org",
[
(1, "manifest.jsonl", False, ""), # public bucket
(2, "sub/manifest.jsonl", True, "org2"), # private bucket
],
)
def test_create_task_with_cloud_storage_files(
self, cloud_storage_id, manifest, use_bucket_content, org
):
if use_bucket_content:
cloud_storage_content = get_cloud_storage_content(
self._USERNAME, cloud_storage_id, manifest
)
else:
cloud_storage_content = ["image_case_65_1.png", "image_case_65_2.png"]
cloud_storage_content.append(manifest)
task_spec = {
"name": f"Task with files from cloud storage {cloud_storage_id}",
"labels": [
{
"name": "car",
}
],
}
data_spec = {
"image_quality": 75,
"use_cache": True,
"cloud_storage_id": cloud_storage_id,
"server_files": cloud_storage_content,
}
self._test_create_task(
self._USERNAME, task_spec, data_spec, content_type="application/json", org=org
)
@pytest.mark.with_external_services
@pytest.mark.parametrize("cloud_storage_id", [1])
@pytest.mark.parametrize(
"manifest, filename_pattern, sub_dir, task_size",
[
("manifest.jsonl", "*", True, 3), # public bucket
("manifest.jsonl", "test/*", True, 3),
("manifest.jsonl", "test/sub*1.jpeg", True, 1),
("manifest.jsonl", "*image*.jpeg", True, 3),
("manifest.jsonl", "wrong_pattern", True, 0),
("abc_manifest.jsonl", "[a-c]*.jpeg", False, 2),
("abc_manifest.jsonl", "[d]*.jpeg", False, 1),
("abc_manifest.jsonl", "[e-z]*.jpeg", False, 0),
],
)
@pytest.mark.parametrize("org", [""])
def test_create_task_with_file_pattern(
self,
cloud_storage_id,
manifest,
filename_pattern,
sub_dir,
task_size,
org,
cloud_storages,
request,
):
# prepare dataset on the bucket
prefixes = ("test_image_",) * 3 if sub_dir else ("a_", "b_", "d_")
images = generate_image_files(3, prefixes=prefixes)
s3_client = s3.make_client()
cloud_storage = cloud_storages[cloud_storage_id]
for image in images:
s3_client.create_file(
data=image,
bucket=cloud_storage["resource"],
filename=f"{'test/sub/' if sub_dir else ''}{image.name}",
)
request.addfinalizer(
partial(
s3_client.remove_file,
bucket=cloud_storage["resource"],
filename=f"{'test/sub/' if sub_dir else ''}{image.name}",
)
)
with TemporaryDirectory() as tmp_dir:
for image in images:
with open(osp.join(tmp_dir, image.name), "wb") as f:
f.write(image.getvalue())
command = [
"docker",
"run",
"--rm",
"-u",
"root:root",
"-v",
f"{tmp_dir}:/local",
"--entrypoint",
"python3",
get_server_image_tag(),
"utils/dataset_manifest/create.py",
"--output-dir",
"/local",
"/local",
]
subprocess.check_output(command)
with open(osp.join(tmp_dir, "manifest.jsonl"), mode="rb") as m_file:
s3_client.create_file(
data=m_file.read(),
bucket=cloud_storage["resource"],
filename=f"test/sub/{manifest}" if sub_dir else manifest,
)
request.addfinalizer(
partial(
s3_client.remove_file,
bucket=cloud_storage["resource"],
filename=f"test/sub/{manifest}" if sub_dir else manifest,
)
)
task_spec = {
"name": f"Task with files from cloud storage {cloud_storage_id}",
"labels": [
{
"name": "car",
}
],
}
data_spec = {
"image_quality": 75,
"use_cache": True,
"cloud_storage_id": cloud_storage_id,
"server_files": [f"test/sub/{manifest}" if sub_dir else manifest],
"filename_pattern": filename_pattern,
}
if task_size:
task_id = self._test_create_task(
self._USERNAME, task_spec, data_spec, content_type="application/json", org=org
)
with make_api_client(self._USERNAME) as api_client:
(task, response) = api_client.tasks_api.retrieve(task_id, org=org)
assert response.status == HTTPStatus.OK
assert task.size == task_size
else:
status = self._test_cannot_create_task(self._USERNAME, task_spec, data_spec)
assert "No media data found" in status.message
@pytest.mark.with_external_services
@pytest.mark.parametrize(
"cloud_storage_id, manifest, org",
[(1, "manifest.jsonl", "")], # public bucket
)
def test_cannot_create_task_with_mythical_cloud_storage_data(
self, cloud_storage_id, manifest, org
):
mythical_file = "mythical.jpg"
cloud_storage_content = [mythical_file, manifest]
task_spec = {
"name": f"Task with mythical file from cloud storage {cloud_storage_id}",
"labels": [{"name": "car"}],
}
data_spec = {
"image_quality": 75,
"use_cache": True,
"cloud_storage_id": cloud_storage_id,
"server_files": cloud_storage_content,
}
status = self._test_cannot_create_task(self._USERNAME, task_spec, data_spec, org=org)
assert mythical_file in status.message
@pytest.mark.usefixtures("restore_db_per_class")
class TestGetTaskPreview:
def _test_task_preview_200(self, username, task_id, **kwargs):
with make_api_client(username) as api_client:
(_, response) = api_client.tasks_api.retrieve_preview(task_id, **kwargs)
assert response.status == HTTPStatus.OK
(width, height) = Image.open(io.BytesIO(response.data)).size
assert width > 0 and height > 0
def _test_task_preview_403(self, username, task_id):
with make_api_client(username) as api_client:
(_, response) = api_client.tasks_api.retrieve_preview(
task_id, _parse_response=False, _check_status=False
)
assert response.status == HTTPStatus.FORBIDDEN
def _test_assigned_users_to_see_task_preview(self, tasks, users, is_task_staff, **kwargs):
for task in tasks:
staff_users = [user for user in users if is_task_staff(user["id"], task["id"])]
assert len(staff_users)
for user in staff_users:
self._test_task_preview_200(user["username"], task["id"], **kwargs)
def _test_assigned_users_cannot_see_task_preview(self, tasks, users, is_task_staff, **kwargs):
for task in tasks:
not_staff_users = [user for user in users if not is_task_staff(user["id"], task["id"])]
assert len(not_staff_users)
for user in not_staff_users:
self._test_task_preview_403(user["username"], task["id"], **kwargs)
@pytest.mark.parametrize("project_id, groups", [(1, "user")])
def test_task_assigned_to_see_task_preview(
self, project_id, groups, users, tasks, find_users, is_task_staff
):
users = find_users(privilege=groups)
tasks = list(filter(lambda x: x["project_id"] == project_id and x["assignee"], tasks))
assert len(tasks)
self._test_assigned_users_to_see_task_preview(tasks, users, is_task_staff)
@pytest.mark.parametrize("org, project_id, role", [({"id": 2, "slug": "org2"}, 2, "worker")])
def test_org_task_assigneed_to_see_task_preview(
self, org, project_id, role, users, tasks, find_users, is_task_staff
):
users = find_users(org=org["id"], role=role)
tasks = list(filter(lambda x: x["project_id"] == project_id and x["assignee"], tasks))
assert len(tasks)
self._test_assigned_users_to_see_task_preview(tasks, users, is_task_staff, org=org["slug"])
@pytest.mark.parametrize("project_id, groups", [(1, "user")])
def test_task_unassigned_cannot_see_task_preview(
self, project_id, groups, users, tasks, find_users, is_task_staff
):
users = find_users(privilege=groups)
tasks = list(filter(lambda x: x["project_id"] == project_id and x["assignee"], tasks))
assert len(tasks)
self._test_assigned_users_cannot_see_task_preview(tasks, users, is_task_staff)