|
|
|
|
@ -6,9 +6,14 @@ import tempfile
|
|
|
|
|
from http import HTTPStatus
|
|
|
|
|
from itertools import groupby, product
|
|
|
|
|
|
|
|
|
|
from time import sleep
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
from copy import deepcopy
|
|
|
|
|
from deepdiff import DeepDiff
|
|
|
|
|
|
|
|
|
|
from .utils.config import get_method, post_files_method, post_method
|
|
|
|
|
from .utils.config import (get_method, patch_method, post_files_method,
|
|
|
|
|
post_method)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.usefixtures('dontchangedb')
|
|
|
|
|
@ -113,6 +118,109 @@ class TestGetProjects:
|
|
|
|
|
|
|
|
|
|
self._test_response_200(user_in_project['username'], project_id, org_id=user_in_project['org'])
|
|
|
|
|
|
|
|
|
|
class TestGetProjectBackup:
|
|
|
|
|
def _test_can_get_project_backup(self, username, pid, **kwargs):
|
|
|
|
|
for _ in range(30):
|
|
|
|
|
response = get_method(username, f"projects/{pid}/backup", **kwargs)
|
|
|
|
|
response.raise_for_status()
|
|
|
|
|
if response.status_code == HTTPStatus.CREATED:
|
|
|
|
|
break
|
|
|
|
|
sleep(1)
|
|
|
|
|
response = get_method(username, f"projects/{pid}/backup", action="download", **kwargs)
|
|
|
|
|
assert response.status_code == HTTPStatus.OK
|
|
|
|
|
|
|
|
|
|
def _test_cannot_get_project_backup(self, username, pid, **kwargs):
|
|
|
|
|
response = get_method(username, f"projects/{pid}/backup", **kwargs)
|
|
|
|
|
assert response.status_code == HTTPStatus.FORBIDDEN
|
|
|
|
|
|
|
|
|
|
def test_admin_can_get_project_backup(self, projects):
|
|
|
|
|
project = list(projects)[0]
|
|
|
|
|
self._test_can_get_project_backup('admin1', project['id'])
|
|
|
|
|
|
|
|
|
|
# User that not in [project:owner, project:assignee] cannot get project backup.
|
|
|
|
|
def test_user_cannot_get_project_backup(self, find_users, projects, is_project_staff):
|
|
|
|
|
users = find_users(exclude_privilege='admin')
|
|
|
|
|
|
|
|
|
|
user, project = next(
|
|
|
|
|
(user, project)
|
|
|
|
|
for user, project in product(users, projects)
|
|
|
|
|
if not is_project_staff(user['id'], project['id'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self._test_cannot_get_project_backup(user['username'], project['id'])
|
|
|
|
|
|
|
|
|
|
# Org worker that not in [project:owner, project:assignee] cannot get project backup.
|
|
|
|
|
def test_org_worker_cannot_get_project_backup(self, find_users, projects, is_project_staff, is_org_member):
|
|
|
|
|
users = find_users(role='worker', exclude_privilege='admin')
|
|
|
|
|
|
|
|
|
|
user, project = next(
|
|
|
|
|
(user, project)
|
|
|
|
|
for user, project in product(users, projects)
|
|
|
|
|
if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self._test_cannot_get_project_backup(user['username'], project['id'], org_id=project['organization'])
|
|
|
|
|
|
|
|
|
|
# Org worker that in [project:owner, project:assignee] can get project backup.
|
|
|
|
|
def test_org_worker_can_get_project_backup(self, find_users, projects, is_project_staff, is_org_member):
|
|
|
|
|
users = find_users(role='worker', exclude_privilege='admin')
|
|
|
|
|
|
|
|
|
|
user, project = next(
|
|
|
|
|
(user, project)
|
|
|
|
|
for user, project in product(users, projects)
|
|
|
|
|
if is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self._test_can_get_project_backup(user['username'], project['id'], org_id=project['organization'])
|
|
|
|
|
|
|
|
|
|
# Org supervisor that in [project:owner, project:assignee] can get project backup.
|
|
|
|
|
def test_org_supervisor_can_get_project_backup(self, find_users, projects, is_project_staff, is_org_member):
|
|
|
|
|
users = find_users(role='supervisor', exclude_privilege='admin')
|
|
|
|
|
|
|
|
|
|
user, project = next(
|
|
|
|
|
(user, project)
|
|
|
|
|
for user, project in product(users, projects)
|
|
|
|
|
if is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self._test_can_get_project_backup(user['username'], project['id'], org_id=project['organization'])
|
|
|
|
|
|
|
|
|
|
# Org supervisor that not in [project:owner, project:assignee] cannot get project backup.
|
|
|
|
|
def test_org_supervisor_cannot_get_project_backup(self, find_users, projects, is_project_staff, is_org_member):
|
|
|
|
|
users = find_users(role='supervisor', exclude_privilege='admin')
|
|
|
|
|
|
|
|
|
|
user, project = next(
|
|
|
|
|
(user, project)
|
|
|
|
|
for user, project in product(users, projects)
|
|
|
|
|
if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self._test_cannot_get_project_backup(user['username'], project['id'], org_id=project['organization'])
|
|
|
|
|
|
|
|
|
|
# Org maintainer that not in [project:owner, project:assignee] can get project backup.
|
|
|
|
|
def test_org_maintainer_can_get_project_backup(self, find_users, projects, is_project_staff, is_org_member):
|
|
|
|
|
users = find_users(role='maintainer', exclude_privilege='admin')
|
|
|
|
|
|
|
|
|
|
user, project = next(
|
|
|
|
|
(user, project)
|
|
|
|
|
for user, project in product(users, projects)
|
|
|
|
|
if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self._test_can_get_project_backup(user['username'], project['id'], org_id=project['organization'])
|
|
|
|
|
|
|
|
|
|
# Org owner that not in [project:owner, project:assignee] can get project backup.
|
|
|
|
|
def test_org_owner_can_get_project_backup(self, find_users, projects, is_project_staff, is_org_member):
|
|
|
|
|
users = find_users(role='owner', exclude_privilege='admin')
|
|
|
|
|
|
|
|
|
|
user, project = next(
|
|
|
|
|
(user, project)
|
|
|
|
|
for user, project in product(users, projects)
|
|
|
|
|
if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self._test_can_get_project_backup(user['username'], project['id'], org_id=project['organization'])
|
|
|
|
|
|
|
|
|
|
@pytest.mark.usefixtures('changedb')
|
|
|
|
|
class TestPostProjects:
|
|
|
|
|
def _test_create_project_201(self, user, spec, **kwargs):
|
|
|
|
|
@ -243,3 +351,101 @@ class TestImportExportDatasetProject:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self._test_import_project(username, project_id, 'CVAT 1.1', import_data)
|
|
|
|
|
|
|
|
|
|
@pytest.mark.usefixtures('changedb')
|
|
|
|
|
class TestPatchProjectLabel:
|
|
|
|
|
def test_admin_can_delete_label(self, projects):
|
|
|
|
|
project = deepcopy(list(projects)[0])
|
|
|
|
|
labels = project['labels'][0]
|
|
|
|
|
labels.update({'deleted': True})
|
|
|
|
|
response = patch_method('admin1', f'/projects/{project["id"]}', {'labels': [labels]})
|
|
|
|
|
assert response.status_code == HTTPStatus.OK
|
|
|
|
|
assert len(response.json()['labels']) == len(project['labels']) - 1
|
|
|
|
|
|
|
|
|
|
def test_admin_can_rename_label(self, projects):
|
|
|
|
|
project = deepcopy(list(projects)[0])
|
|
|
|
|
labels = project['labels'][0]
|
|
|
|
|
labels.update({'name': 'new name'})
|
|
|
|
|
response = patch_method('admin1', f'/projects/{project["id"]}', {'labels': [labels]})
|
|
|
|
|
assert response.status_code == HTTPStatus.OK
|
|
|
|
|
assert DeepDiff(response.json()['labels'], project['labels'], ignore_order=True) == {}
|
|
|
|
|
|
|
|
|
|
def test_admin_can_add_label(self, projects):
|
|
|
|
|
project = list(projects)[0]
|
|
|
|
|
labels = {'name': 'new name'}
|
|
|
|
|
response = patch_method('admin1', f'/projects/{project["id"]}', {'labels': [labels]})
|
|
|
|
|
assert response.status_code == HTTPStatus.OK
|
|
|
|
|
assert len(response.json()['labels']) == len(project['labels']) + 1
|
|
|
|
|
|
|
|
|
|
# Org maintainer can add label even he is not in [project:owner, project:assignee]
|
|
|
|
|
def test_org_maintainer_can_add_label(self, find_users, projects, is_project_staff, is_org_member):
|
|
|
|
|
users = find_users(role='maintainer', exclude_privilege='admin')
|
|
|
|
|
|
|
|
|
|
user, project = next(
|
|
|
|
|
(user, project)
|
|
|
|
|
for user, project in product(users, projects)
|
|
|
|
|
if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
labels = {'name': 'new name'}
|
|
|
|
|
response = patch_method(user['username'], f'/projects/{project["id"]}', {'labels': [labels]}, org_id=project['organization'])
|
|
|
|
|
assert response.status_code == HTTPStatus.OK
|
|
|
|
|
assert len(response.json()['labels']) == len(project['labels']) + 1
|
|
|
|
|
|
|
|
|
|
# Org supervisor cannot add label
|
|
|
|
|
def test_org_supervisor_can_add_label(self, find_users, projects, is_project_staff, is_org_member):
|
|
|
|
|
users = find_users(role='supervisor', exclude_privilege='admin')
|
|
|
|
|
|
|
|
|
|
user, project = next(
|
|
|
|
|
(user, project)
|
|
|
|
|
for user, project in product(users, projects)
|
|
|
|
|
if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
labels = {'name': 'new name'}
|
|
|
|
|
response = patch_method(user['username'], f'/projects/{project["id"]}', {'labels': [labels]}, org_id=project['organization'])
|
|
|
|
|
assert response.status_code == HTTPStatus.FORBIDDEN
|
|
|
|
|
|
|
|
|
|
# Org worker cannot add label
|
|
|
|
|
def test_org_worker_cannot_add_label(self, find_users, projects, is_project_staff, is_org_member):
|
|
|
|
|
users = find_users(role='worker', exclude_privilege='admin')
|
|
|
|
|
|
|
|
|
|
user, project = next(
|
|
|
|
|
(user, project)
|
|
|
|
|
for user, project in product(users, projects)
|
|
|
|
|
if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
labels = {'name': 'new name'}
|
|
|
|
|
response = patch_method(user['username'], f'/projects/{project["id"]}', {'labels': [labels]}, org_id=project['organization'])
|
|
|
|
|
assert response.status_code == HTTPStatus.FORBIDDEN
|
|
|
|
|
|
|
|
|
|
# Org worker that in [project:owner, project:assignee] can add label
|
|
|
|
|
def test_org_worker_can_add_label(self, find_users, projects, is_project_staff, is_org_member):
|
|
|
|
|
users = find_users(role='worker', exclude_privilege='admin')
|
|
|
|
|
|
|
|
|
|
user, project = next(
|
|
|
|
|
(user, project)
|
|
|
|
|
for user, project in product(users, projects)
|
|
|
|
|
if is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
labels = {'name': 'new name'}
|
|
|
|
|
response = patch_method(user['username'], f'/projects/{project["id"]}', {'labels': [labels]}, org_id=project['organization'])
|
|
|
|
|
assert response.status_code == HTTPStatus.OK
|
|
|
|
|
assert len(response.json()['labels']) == len(project['labels']) + 1
|
|
|
|
|
|
|
|
|
|
# Org owner can add label even he is not in [project:owner, project:assignee]
|
|
|
|
|
def test_org_owner_can_add_label(self, find_users, projects, is_project_staff, is_org_member):
|
|
|
|
|
users = find_users(role='owner', exclude_privilege='admin')
|
|
|
|
|
|
|
|
|
|
user, project = next(
|
|
|
|
|
(user, project)
|
|
|
|
|
for user, project in product(users, projects)
|
|
|
|
|
if not is_project_staff(user['id'], project['id']) and is_org_member(user['id'], project['organization'])
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
labels = {'name': 'new name'}
|
|
|
|
|
response = patch_method(user['username'], f'/projects/{project["id"]}', {'labels': [labels]}, org_id=project['organization'])
|
|
|
|
|
assert response.status_code == HTTPStatus.OK
|
|
|
|
|
assert len(response.json()['labels']) == len(project['labels']) + 1
|
|
|
|
|
|