You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
561 lines
20 KiB
Python
561 lines
20 KiB
Python
# Copyright (C) 2022 CVAT.ai Corporation
|
|
#
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
import json
|
|
from contextlib import contextmanager
|
|
from functools import partial
|
|
from http import HTTPStatus
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from uuid import uuid4
|
|
|
|
import boto3
|
|
import pytest
|
|
from cvat_sdk import Client, exceptions
|
|
from cvat_sdk.api_client import ApiClient, models
|
|
from cvat_sdk.core.client import Config
|
|
from cvat_sdk.core.proxies.projects import Project
|
|
from cvat_sdk.core.proxies.tasks import ResourceType, Task
|
|
|
|
from shared.utils.config import (
|
|
BASE_URL,
|
|
MINIO_ENDPOINT_URL,
|
|
MINIO_KEY,
|
|
MINIO_SECRET_KEY,
|
|
USER_PASS,
|
|
post_method,
|
|
)
|
|
from shared.utils.helpers import generate_image_file
|
|
|
|
|
|
@pytest.fixture
|
|
def fxt_image_file(tmp_path: Path):
|
|
img_path = tmp_path / "img.png"
|
|
with img_path.open("wb") as f:
|
|
f.write(generate_image_file(filename=str(img_path), size=(5, 10)).getvalue())
|
|
|
|
return img_path
|
|
|
|
|
|
def get_common_storage_params():
|
|
return {
|
|
"provider_type": "AWS_S3_BUCKET",
|
|
"credentials_type": "KEY_SECRET_KEY_PAIR",
|
|
"key": "minio_access_key",
|
|
"secret_key": "minio_secret_key",
|
|
"specific_attributes": "endpoint_url=http://minio:9000",
|
|
}
|
|
|
|
|
|
def define_s3_client():
|
|
s3 = boto3.resource(
|
|
"s3",
|
|
aws_access_key_id=MINIO_KEY,
|
|
aws_secret_access_key=MINIO_SECRET_KEY,
|
|
endpoint_url=MINIO_ENDPOINT_URL,
|
|
)
|
|
return s3.meta.client
|
|
|
|
|
|
class TestUserLimits:
|
|
@classmethod
|
|
def _create_user(cls, api_client: ApiClient, email: str) -> str:
|
|
username = email.split("@", maxsplit=1)[0]
|
|
with api_client:
|
|
(user, _) = api_client.auth_api.create_register(
|
|
models.RegisterSerializerExRequest(
|
|
username=username, password1=USER_PASS, password2=USER_PASS, email=email
|
|
)
|
|
)
|
|
|
|
api_client.cookies.clear()
|
|
|
|
return user.username
|
|
|
|
def _make_client(self) -> Client:
|
|
return Client(BASE_URL, config=Config(status_check_period=0.01))
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup(self, restore_db_per_function, tmp_path: Path, fxt_image_file: Path):
|
|
self.tmp_dir = tmp_path
|
|
self.image_file = fxt_image_file
|
|
|
|
self.client = self._make_client()
|
|
self.user = self._create_user(self.client.api_client, email="test_user_limits@localhost")
|
|
|
|
with self.client:
|
|
self.client.login((self.user, USER_PASS))
|
|
|
|
@pytest.fixture
|
|
def fxt_another_client(self) -> Client:
|
|
client = self._make_client()
|
|
user = self._create_user(self.client.api_client, email="test_user_limits2@localhost")
|
|
|
|
with client:
|
|
client.login((user, USER_PASS))
|
|
yield client
|
|
|
|
_DEFAULT_TASKS_LIMIT = 10
|
|
_DEFAULT_PROJECT_TASKS_LIMIT = 5
|
|
_DEFAULT_PROJECTS_LIMIT = 3
|
|
_DEFAULT_ORGS_LIMIT = 1
|
|
_DEFAULT_CLOUD_STORAGES_LIMIT = 10
|
|
|
|
_TASK_LIMIT_MESSAGE = "user tasks limit reached"
|
|
_PROJECT_TASK_LIMIT_MESSAGE = "user project tasks limit reached"
|
|
_PROJECTS_LIMIT_MESSAGE = "user projects limit reached"
|
|
_ORGS_LIMIT_MESSAGE = "user orgs limit reached"
|
|
_CLOUD_STORAGES_LIMIT_MESSAGE = "user cloud storages limit reached"
|
|
|
|
def _create_task(
|
|
self, *, project: Optional[int] = None, client: Optional[Client] = None
|
|
) -> Task:
|
|
if client is None:
|
|
client = self.client
|
|
|
|
return client.tasks.create_from_data(
|
|
spec=models.TaskWriteRequest(
|
|
name="test_task",
|
|
labels=[models.PatchedLabelRequest(name="cat")] if not project else [],
|
|
project_id=project,
|
|
),
|
|
resource_type=ResourceType.LOCAL,
|
|
resources=[str(self.image_file)],
|
|
)
|
|
|
|
def _create_project(self, *, client: Optional[Client] = None) -> Project:
|
|
if client is None:
|
|
client = self.client
|
|
|
|
return client.projects.create(models.ProjectWriteRequest(name="test_project"))
|
|
|
|
def test_can_reach_tasks_limit(self):
|
|
for _ in range(self._DEFAULT_TASKS_LIMIT):
|
|
self._create_task()
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
self._create_task()
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._TASK_LIMIT_MESSAGE}
|
|
|
|
def test_can_reach_tasks_limit_when_importing_backup(self):
|
|
for _ in range(self._DEFAULT_TASKS_LIMIT):
|
|
task = self._create_task()
|
|
|
|
backup_filename = self.tmp_dir / "task_backup.zip"
|
|
task.download_backup(backup_filename)
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
self.client.tasks.create_from_backup(backup_filename)
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._TASK_LIMIT_MESSAGE}
|
|
|
|
def test_can_reach_tasks_limit_when_creating_in_project(self):
|
|
project = self._create_project().id
|
|
|
|
for _ in range(self._DEFAULT_PROJECT_TASKS_LIMIT):
|
|
self._create_task(project=project)
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
self._create_task(project=project)
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._PROJECT_TASK_LIMIT_MESSAGE}
|
|
|
|
def test_can_reach_tasks_limit_when_creating_in_different_projects(self):
|
|
project1 = self._create_project().id
|
|
project2 = self._create_project().id
|
|
|
|
for _ in range(self._DEFAULT_PROJECT_TASKS_LIMIT):
|
|
self._create_task(project=project1)
|
|
for _ in range(self._DEFAULT_TASKS_LIMIT - self._DEFAULT_PROJECT_TASKS_LIMIT):
|
|
self._create_task(project=project2)
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
self._create_task()
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._TASK_LIMIT_MESSAGE}
|
|
|
|
def test_can_reach_tasks_limit_when_creating_in_filled_project(self):
|
|
project = self._create_project().id
|
|
|
|
for _ in range(self._DEFAULT_PROJECT_TASKS_LIMIT):
|
|
self._create_task(project=project)
|
|
for _ in range(self._DEFAULT_TASKS_LIMIT - self._DEFAULT_PROJECT_TASKS_LIMIT):
|
|
self._create_task()
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
self._create_task(project=project)
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {
|
|
self._TASK_LIMIT_MESSAGE,
|
|
self._PROJECT_TASK_LIMIT_MESSAGE,
|
|
}
|
|
|
|
def test_can_reach_project_tasks_limit_when_moving_into_filled_project(self):
|
|
project = self._create_project().id
|
|
for _ in range(self._DEFAULT_PROJECT_TASKS_LIMIT):
|
|
self._create_task(project=project)
|
|
|
|
task = self._create_task()
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
task.update(models.PatchedTaskWriteRequest(project_id=project))
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._PROJECT_TASK_LIMIT_MESSAGE}
|
|
|
|
@pytest.mark.xfail(
|
|
raises=AssertionError, reason="only admins can change ownership, but they ignore limits"
|
|
)
|
|
def test_can_reach_tasks_limit_when_giving_away_to_another_user(
|
|
self, fxt_another_client: Client
|
|
):
|
|
for _ in range(self._DEFAULT_TASKS_LIMIT):
|
|
self._create_task(client=fxt_another_client)
|
|
|
|
task = self._create_task()
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
task.update(
|
|
models.PatchedTaskWriteRequest(
|
|
owner_id=fxt_another_client.users.retrieve_current_user().id
|
|
)
|
|
)
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._PROJECT_TASK_LIMIT_MESSAGE}
|
|
|
|
@pytest.mark.xfail(
|
|
raises=AssertionError, reason="only admins can change ownership, but they ignore limits"
|
|
)
|
|
def test_can_reach_project_tasks_limit_when_giving_away_to_another_users_filled_project(
|
|
self, fxt_another_client: Client
|
|
):
|
|
project = self._create_project(client=fxt_another_client).id
|
|
|
|
for _ in range(self._DEFAULT_PROJECT_TASKS_LIMIT):
|
|
self._create_task(client=fxt_another_client, project=project)
|
|
for _ in range(self._DEFAULT_TASKS_LIMIT - self._DEFAULT_PROJECT_TASKS_LIMIT):
|
|
self._create_task(client=fxt_another_client)
|
|
|
|
task = self._create_task()
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
task.update(
|
|
models.PatchedTaskWriteRequest(
|
|
owner_id=fxt_another_client.users.retrieve_current_user().id
|
|
)
|
|
)
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {
|
|
self._DEFAULT_TASKS_LIMIT,
|
|
self._PROJECT_TASK_LIMIT_MESSAGE,
|
|
}
|
|
|
|
@pytest.mark.xfail(
|
|
raises=AssertionError, reason="only admins can change ownership, but they ignore limits"
|
|
)
|
|
def test_can_reach_projects_limit_when_giving_away_to_another_user(
|
|
self, fxt_another_client: Client
|
|
):
|
|
for _ in range(self._DEFAULT_PROJECTS_LIMIT):
|
|
self._create_project(client=fxt_another_client)
|
|
|
|
project = self._create_project()
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
project.update(
|
|
models.PatchedProjectWriteRequest(
|
|
owner_id=fxt_another_client.users.retrieve_current_user().id
|
|
)
|
|
)
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._PROJECT_TASK_LIMIT_MESSAGE}
|
|
|
|
def test_can_reach_projects_limit(self):
|
|
for _ in range(self._DEFAULT_PROJECTS_LIMIT):
|
|
self._create_project()
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
self._create_project()
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._PROJECTS_LIMIT_MESSAGE}
|
|
|
|
def test_can_reach_projects_limit_when_importing_backup(self):
|
|
for _ in range(self._DEFAULT_PROJECTS_LIMIT):
|
|
project = self._create_project()
|
|
|
|
backup_filename = self.tmp_dir / (project.name + "_backup.zip")
|
|
project.download_backup(backup_filename)
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
self.client.projects.create_from_backup(backup_filename)
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._PROJECTS_LIMIT_MESSAGE}
|
|
|
|
def test_can_reach_orgs_limit(self):
|
|
for i in range(self._DEFAULT_ORGS_LIMIT):
|
|
(_, response) = self.client.api_client.organizations_api.create(
|
|
models.OrganizationWriteRequest(slug=f"test_user_orgs_{i}"), _parse_response=False
|
|
)
|
|
assert response.status == HTTPStatus.CREATED
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
self.client.api_client.organizations_api.create(
|
|
models.OrganizationWriteRequest(slug=f"test_user_orgs_{i}"), _parse_response=False
|
|
)
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._ORGS_LIMIT_MESSAGE}
|
|
|
|
@pytest.mark.with_external_services
|
|
def test_can_reach_cloud_storages_limit(self, request: pytest.FixtureRequest):
|
|
storage_params = get_common_storage_params()
|
|
|
|
# TODO: refactor after https://github.com/opencv/cvat/pull/4819
|
|
s3_client = define_s3_client()
|
|
|
|
def _create_bucket(name: str) -> str:
|
|
name = name + str(uuid4())
|
|
s3_client.create_bucket(Bucket=name)
|
|
request.addfinalizer(partial(s3_client.delete_bucket, Bucket=name))
|
|
return name
|
|
|
|
def _add_storage(idx: int):
|
|
response = post_method(
|
|
self.user,
|
|
"cloudstorages",
|
|
{
|
|
"display_name": f"test_storage{idx}",
|
|
"resource": _create_bucket(f"testbucket{idx}"),
|
|
**storage_params,
|
|
},
|
|
)
|
|
return response
|
|
|
|
for i in range(self._DEFAULT_CLOUD_STORAGES_LIMIT):
|
|
response = _add_storage(i)
|
|
assert response.status_code == HTTPStatus.CREATED
|
|
|
|
response = _add_storage(i)
|
|
|
|
assert response.status_code == HTTPStatus.FORBIDDEN
|
|
assert set(response.json()) == {self._CLOUD_STORAGES_LIMIT_MESSAGE}
|
|
|
|
|
|
class TestOrgLimits:
|
|
@classmethod
|
|
def _create_org(cls, api_client: ApiClient) -> str:
|
|
with api_client:
|
|
(_, response) = api_client.organizations_api.create(
|
|
models.OrganizationWriteRequest(slug="test_org_limits"), _parse_response=False
|
|
)
|
|
|
|
return json.loads(response.data)
|
|
|
|
def _make_client(self) -> Client:
|
|
return Client(BASE_URL, config=Config(status_check_period=0.01))
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup(
|
|
self, restore_db_per_function, tmp_path: Path, regular_user: str, fxt_image_file: Path
|
|
):
|
|
self.tmp_dir = tmp_path
|
|
self.image_file = fxt_image_file
|
|
|
|
self.client = self._make_client()
|
|
self.user = regular_user
|
|
|
|
with self.client:
|
|
self.client.login((self.user, USER_PASS))
|
|
|
|
org = self._create_org(self.client.api_client)
|
|
self.org = org["id"]
|
|
self.org_slug = org["slug"]
|
|
|
|
with self._patch_client_with_org(self.client):
|
|
yield
|
|
|
|
_DEFAULT_TASKS_LIMIT = 10
|
|
_DEFAULT_PROJECT_TASKS_LIMIT = 5
|
|
_DEFAULT_PROJECTS_LIMIT = 3
|
|
_DEFAULT_CLOUD_STORAGES_LIMIT = 10
|
|
|
|
_TASK_LIMIT_MESSAGE = "org tasks limit reached"
|
|
_PROJECT_TASK_LIMIT_MESSAGE = "org project tasks limit reached"
|
|
_PROJECTS_LIMIT_MESSAGE = "org projects limit reached"
|
|
_CLOUD_STORAGES_LIMIT_MESSAGE = "org cloud storages limit reached"
|
|
|
|
@contextmanager
|
|
def _patch_client_with_org(self, client: Optional[Client] = None):
|
|
if client is None:
|
|
client = self.client
|
|
|
|
new_headers = self.client.api_client.default_headers.copy()
|
|
new_headers["X-Organization"] = self.org_slug
|
|
with pytest.MonkeyPatch.context() as monkeypatch:
|
|
monkeypatch.setattr(client.api_client, "default_headers", new_headers)
|
|
yield client
|
|
|
|
@pytest.fixture
|
|
def fxt_patch_client_with_org(self):
|
|
with self._patch_client_with_org(self.client):
|
|
yield
|
|
|
|
def _create_task(
|
|
self, *, project: Optional[int] = None, client: Optional[Client] = None
|
|
) -> Task:
|
|
if client is None:
|
|
client = self.client
|
|
|
|
return client.tasks.create_from_data(
|
|
spec=models.TaskWriteRequest(
|
|
name="test_task",
|
|
labels=[models.PatchedLabelRequest(name="cat")] if not project else [],
|
|
project_id=project,
|
|
),
|
|
resource_type=ResourceType.LOCAL,
|
|
resources=[str(self.image_file)],
|
|
)
|
|
|
|
def _create_project(self, *, client: Optional[Client] = None) -> Project:
|
|
if client is None:
|
|
client = self.client
|
|
|
|
return client.projects.create(models.ProjectWriteRequest(name="test_project"))
|
|
|
|
def test_can_reach_tasks_limit(self):
|
|
for _ in range(self._DEFAULT_TASKS_LIMIT):
|
|
self._create_task()
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
self._create_task()
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._TASK_LIMIT_MESSAGE}
|
|
|
|
def test_can_reach_tasks_limit_when_importing_backup(self):
|
|
for _ in range(self._DEFAULT_TASKS_LIMIT):
|
|
task = self._create_task()
|
|
|
|
backup_filename = self.tmp_dir / "task_backup.zip"
|
|
task.download_backup(backup_filename)
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
self.client.tasks.create_from_backup(backup_filename)
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._TASK_LIMIT_MESSAGE}
|
|
|
|
def test_can_reach_tasks_limit_when_creating_in_project(self):
|
|
project = self._create_project().id
|
|
|
|
for _ in range(self._DEFAULT_PROJECT_TASKS_LIMIT):
|
|
self._create_task(project=project)
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
self._create_task(project=project)
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._PROJECT_TASK_LIMIT_MESSAGE}
|
|
|
|
def test_can_reach_tasks_limit_when_creating_in_different_projects(self):
|
|
project1 = self._create_project().id
|
|
project2 = self._create_project().id
|
|
|
|
for _ in range(self._DEFAULT_PROJECT_TASKS_LIMIT):
|
|
self._create_task(project=project1)
|
|
for _ in range(self._DEFAULT_TASKS_LIMIT - self._DEFAULT_PROJECT_TASKS_LIMIT):
|
|
self._create_task(project=project2)
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
self._create_task()
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._TASK_LIMIT_MESSAGE}
|
|
|
|
def test_can_reach_tasks_limit_when_creating_in_filled_project(self):
|
|
project = self._create_project().id
|
|
|
|
for _ in range(self._DEFAULT_PROJECT_TASKS_LIMIT):
|
|
self._create_task(project=project)
|
|
for _ in range(self._DEFAULT_TASKS_LIMIT - self._DEFAULT_PROJECT_TASKS_LIMIT):
|
|
self._create_task()
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
self._create_task(project=project)
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {
|
|
self._TASK_LIMIT_MESSAGE,
|
|
self._PROJECT_TASK_LIMIT_MESSAGE,
|
|
}
|
|
|
|
def test_can_reach_projects_limit(self):
|
|
for _ in range(self._DEFAULT_PROJECTS_LIMIT):
|
|
self._create_project()
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
self._create_project()
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._PROJECTS_LIMIT_MESSAGE}
|
|
|
|
def test_can_reach_projects_limit_when_importing_backup(self):
|
|
for _ in range(self._DEFAULT_PROJECTS_LIMIT):
|
|
project = self._create_project()
|
|
|
|
backup_filename = self.tmp_dir / "test_project_backup.zip"
|
|
project.download_backup(str(backup_filename))
|
|
|
|
with pytest.raises(exceptions.ApiException) as capture:
|
|
self.client.projects.create_from_backup(str(backup_filename))
|
|
|
|
assert capture.value.status == HTTPStatus.FORBIDDEN
|
|
assert set(json.loads(capture.value.body)) == {self._PROJECTS_LIMIT_MESSAGE}
|
|
|
|
@pytest.mark.with_external_services
|
|
def test_can_reach_cloud_storages_limit(self, request: pytest.FixtureRequest):
|
|
storage_params = get_common_storage_params()
|
|
|
|
# TODO: refactor after https://github.com/opencv/cvat/pull/4819
|
|
s3_client = define_s3_client()
|
|
|
|
def _create_bucket(name: str) -> str:
|
|
name = name + str(uuid4())
|
|
s3_client.create_bucket(Bucket=name)
|
|
request.addfinalizer(partial(s3_client.delete_bucket, Bucket=name))
|
|
return name
|
|
|
|
def _add_storage(idx: int):
|
|
response = post_method(
|
|
self.user,
|
|
"cloudstorages",
|
|
{
|
|
"display_name": f"test_storage{idx}",
|
|
"resource": _create_bucket(f"testbucket{idx}"),
|
|
**storage_params,
|
|
},
|
|
org_id=self.org,
|
|
)
|
|
return response
|
|
|
|
for i in range(self._DEFAULT_CLOUD_STORAGES_LIMIT):
|
|
response = _add_storage(i)
|
|
assert response.status_code == HTTPStatus.CREATED
|
|
|
|
response = _add_storage(i)
|
|
|
|
assert response.status_code == HTTPStatus.FORBIDDEN
|
|
assert set(response.json()) == {self._CLOUD_STORAGES_LIMIT_MESSAGE}
|