REST API tests for GET jobs + GET jobs/ID/annotations (#4295)

main
Nikita Manovich 4 years ago committed by GitHub
parent 871b150d79
commit 331e4ca73c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -32,5 +32,10 @@
"name": "cvat",
"database": "${workspaceFolder:cvat}/db.sqlite3"
}
]
],
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}

@ -85,7 +85,8 @@ class InvitationWriteSerializer(serializers.ModelSerializer):
f'with {membership_data["user"]["email"]} email. It is not '
f'a valid email in the system.')
membership, created = Membership.objects.get_or_create(**membership_data,
membership, created = Membership.objects.get_or_create(
defaults=membership_data,
user=user, organization=organization)
if not created:
raise serializers.ValidationError('The user is a member of '

File diff suppressed because it is too large Load Diff

@ -50,27 +50,69 @@ def init_test_db():
def restore():
restore_cvat_db()
class Container:
def __init__(self, data, key='id'):
self.raw_data = data
self.map_data = { obj[key]: obj for obj in data }
@property
def raw(self):
return self.raw_data
@property
def map(self):
return self.map_data
def __iter__(self):
return iter(self.raw_data)
def __len__(self):
return len(self.raw_data)
def __getitem__(self, key):
if isinstance(key, slice):
return self.raw_data[key]
return self.map_data[key]
@pytest.fixture(scope='module')
def users():
with open(osp.join(ASSETS_DIR, 'users.json')) as f:
return json.load(f)['results']
return Container(json.load(f)['results'])
@pytest.fixture(scope='module')
def organizations():
with open(osp.join(ASSETS_DIR, 'organizations.json')) as f:
data = json.load(f)
def _organizations(org_id=None):
if org_id:
return [org for org in data if org['id'] == org_id][0]
return data
return _organizations
return Container(json.load(f))
@pytest.fixture(scope='module')
def memberships():
with open(osp.join(ASSETS_DIR, 'memberships.json')) as f:
return json.load(f)['results']
return Container(json.load(f)['results'])
@pytest.fixture(scope='module')
def tasks():
with open(osp.join(ASSETS_DIR, 'tasks.json')) as f:
return Container(json.load(f)['results'])
@pytest.fixture(scope='module')
def projects():
with open(osp.join(ASSETS_DIR, 'projects.json')) as f:
return Container(json.load(f)['results'])
@pytest.fixture(scope='module')
def jobs():
with open(osp.join(ASSETS_DIR, 'jobs.json')) as f:
return Container(json.load(f)['results'])
@pytest.fixture(scope='module')
def invitations():
with open(osp.join(ASSETS_DIR, 'invitations.json')) as f:
return Container(json.load(f)['results'], key='key')
@pytest.fixture(scope='module')
def annotations():
with open(osp.join(ASSETS_DIR, 'annotations.json')) as f:
return json.load(f)
@pytest.fixture(scope='module')
def users_by_name(users):
@ -116,7 +158,3 @@ def test_db(users, users_by_name, memberships):
membership_id=membership['id'])
return data

@ -13,9 +13,15 @@ import pytest
def test_check_objects_integrity(path):
with open(path) as f:
endpoint = osp.basename(path).rsplit('.')[0]
response = config.get_method('admin1', endpoint, page_size='all')
json_objs = json.load(f)
resp_objs = response.json()
if endpoint == 'annotations':
objects = json.load(f)
for jid, annotations in objects['job'].items():
response = config.get_method('admin1', f'jobs/{jid}/annotations').json()
assert DeepDiff(annotations, response, ignore_order=True) == {}
else:
response = config.get_method('admin1', endpoint, page_size='all')
json_objs = json.load(f)
resp_objs = response.json()
assert DeepDiff(json_objs, resp_objs, ignore_order=True,
exclude_regex_paths="root\['results'\]\[\d+\]\['last_login'\]") == {}
assert DeepDiff(json_objs, resp_objs, ignore_order=True,
exclude_regex_paths="root\['results'\]\[\d+\]\['last_login'\]") == {}

@ -24,7 +24,7 @@ class TestGetUsers:
def test_admin_can_see_all_others(self, users):
exclude_paths = [f"root[{i}]['last_login']" for i in range(len(users))]
self._test_can_see('admin2', users, exclude_paths=exclude_paths,
self._test_can_see('admin2', users.raw, exclude_paths=exclude_paths,
page_size="all")
def test_everybody_can_see_self(self, users_by_name):

@ -30,7 +30,7 @@ class TestGetOrganizations:
response = get_method(user, f'organizations/{self._ORG}')
if is_allow:
assert response.status_code == HTTPStatus.OK
assert DeepDiff(organizations(self._ORG), response.json()) == {}
assert DeepDiff(organizations[self._ORG], response.json()) == {}
else:
assert response.status_code == HTTPStatus.NOT_FOUND
@ -44,7 +44,7 @@ class TestPatchOrganizations:
@pytest.fixture(scope='class')
def expected_data(self, organizations, request_data):
data = organizations(self._ORG).copy()
data = organizations[self._ORG].copy()
data.update(request_data)
return data

@ -21,16 +21,17 @@ class TestGetMemberships:
assert response.status_code == HTTPStatus.FORBIDDEN
def test_admin_can_see_all_memberships(self, memberships):
self._test_can_see_memberships('admin2', memberships, page_size='all')
self._test_can_see_memberships('admin2', memberships.raw, page_size='all')
def test_non_admin_can_see_only_self_memberships(self, memberships):
non_admins= ['business1', 'user1', 'dummy1','worker2']
for user in non_admins:
data = [m for m in memberships if m['user']['username'] == user]
self._test_can_see_memberships(user, data)
for username in non_admins:
data = [obj for obj in memberships
if obj['user']['username'] == username]
self._test_can_see_memberships(username, data)
def test_all_members_can_see_other_members_membership(self, memberships):
data = [m for m in memberships if m['organization'] == 1]
data = [obj for obj in memberships if obj['organization'] == 1]
for membership in data:
self._test_can_see_memberships(membership['user']['username'],
data, org_id=1)

@ -0,0 +1,72 @@
# Copyright (C) 2021 Intel Corporation
#
# SPDX-License-Identifier: MIT
from http import HTTPStatus
import pytest
from .utils.config import post_method
class TestCreateInvitations:
def _test_post_invitation_201(self, user, data, invitee, **kwargs):
response = post_method(user, 'invitations', data, **kwargs)
assert response.status_code == HTTPStatus.CREATED
assert data['role'] == response.json()['role']
assert invitee['id'] == response.json()['user']['id']
assert kwargs['org_id'] == response.json()['organization']
def _test_post_invitation_403(self, user, data, **kwargs):
response = post_method(user, 'invitations', data, **kwargs)
assert response.status_code == HTTPStatus.FORBIDDEN
@staticmethod
def get_non_member_users(memberships, users):
organization_users = set(m['user']['id'] for m in memberships if m['user'] != None)
non_member_users = [u for u in users if u['id'] not in organization_users]
return non_member_users
@staticmethod
def get_member(role, memberships, org_id):
member = [m['user'] for m in memberships if m['role'] == role and
m['organization'] == org_id and m['user'] != None][0]
return member
@pytest.mark.parametrize('org_id', [2])
@pytest.mark.parametrize('org_role', ['worker', 'supervisor', 'maintainer', 'owner'])
def test_create_invitation(self, organizations, memberships, users,
org_id, org_role):
member = self.get_member(org_role, memberships, org_id)
non_member_users = self.get_non_member_users(memberships, users)
if org_role in ['worker', 'supervisor']:
for invitee_role in ['worker', 'supervisor', 'maintainer', 'owner']:
self._test_post_invitation_403(member['username'], {
'role': invitee_role,
'email': non_member_users[0]['email']
}, org_id=org_id)
else:
for idx, invitee_role in enumerate(['worker', 'supervisor']):
self._test_post_invitation_201(member['username'], {
'role': invitee_role,
'email': non_member_users[idx]['email']
}, non_member_users[idx], org_id=org_id)
# only the owner can invite a maintainer
if org_role == 'owner':
self._test_post_invitation_201(member['username'], {
'role': 'maintainer',
'email': non_member_users[2]['email']
}, non_member_users[2], org_id=org_id)
else:
self._test_post_invitation_403(member['username'], {
'role': 'maintainer',
'email': non_member_users[3]['email']
}, org_id=org_id)
# nobody can invite an owner
self._test_post_invitation_403(member['username'], {
'role': 'owner',
'email': non_member_users[4]['email']
}, org_id=org_id)

@ -0,0 +1,170 @@
# Copyright (C) 2021 Intel Corporation
#
# SPDX-License-Identifier: MIT
from http import HTTPStatus
from deepdiff import DeepDiff
import pytest
from .utils.config import get_method
def get_job_staff(job, tasks, projects):
job_staff = []
job_staff.append(job['assignee'])
tid = job['task_id']
job_staff.append(tasks[tid]['owner'])
job_staff.append(tasks[tid]['assignee'])
pid = job['project_id']
if pid:
job_staff.append(projects[pid]['owner'])
job_staff.append(projects[pid]['assignee'])
job_staff = set(u['id'] for u in job_staff if u is not None)
return job_staff
def get_org_staff(org_id, memberships):
if org_id in ['', None]:
return set()
else:
return set(m['user']['id'] for m in memberships
if m['role'] in ['maintainer', 'owner'] and m['user'] != None
and m['organization'] == org_id)
def filter_jobs(jobs, tasks, org):
if org is None:
kwargs = {}
jobs = jobs.raw
elif org == '':
kwargs = {'org': ''}
jobs = [job for job in jobs
if tasks[job['task_id']]['organization'] is None]
else:
kwargs = {'org_id': org}
jobs = [job for job in jobs
if tasks[job['task_id']]['organization'] == org]
return jobs, kwargs
def is_org_member(memberships, user, org_id):
if org_id in ['', None]:
return True
else:
return user['id'] in set(m['user']['id'] for m in memberships
if m['user'] != None and m['organization'] == org_id)
class TestGetJobs:
def _test_get_job_200(self, user, jid, data, **kwargs):
response = get_method(user, f'jobs/{jid}', **kwargs)
assert response.status_code == HTTPStatus.OK
assert DeepDiff(data, response.json()) == {}
def _test_get_job_403(self, user, jid, **kwargs):
response = get_method(user, f'jobs/{jid}', **kwargs)
assert response.status_code == HTTPStatus.FORBIDDEN
@pytest.mark.parametrize('org', [None, '', 1, 2])
def test_admin_get_job(self, jobs, tasks, org):
jobs, kwargs = filter_jobs(jobs, tasks, org)
# keep only the reasonable amount of jobs
for job in jobs[:8]:
self._test_get_job_200('admin2', job['id'], job, **kwargs)
@pytest.mark.parametrize('org_id', ['', None, 1, 2])
@pytest.mark.parametrize('groups', [['business'], ['user'], ['worker'], []])
def test_non_admin_get_job(self, org_id, groups, users, jobs, tasks, projects,
memberships):
# keep the reasonable amount of users and jobs
users = [u for u in users if u['groups'] == groups][:4]
jobs, kwargs = filter_jobs(jobs, tasks, org_id)
org_staff = get_org_staff(org_id, memberships)
for job in jobs[:8]:
job_staff = get_job_staff(job, tasks, projects)
# check if the specific user in job_staff to see the job
for user in users:
if user['id'] in job_staff | org_staff:
self._test_get_job_200(user['username'], job['id'], job, **kwargs)
else:
self._test_get_job_403(user['username'], job['id'], **kwargs)
class TestListJobs:
def _test_list_jobs_200(self, user, data, **kwargs):
response = get_method(user, 'jobs', **kwargs, page_size=all)
assert response.status_code == HTTPStatus.OK
assert DeepDiff(data, response.json()['results']) == {}
def _test_list_jobs_403(self, user, **kwargs):
response = get_method(user, 'jobs', **kwargs)
assert response.status_code == HTTPStatus.FORBIDDEN
@pytest.mark.parametrize('org', [None, '', 1, 2])
def test_admin_list_jobs(self, jobs, tasks, org):
jobs, kwargs = filter_jobs(jobs, tasks, org)
self._test_list_jobs_200('admin1', jobs, **kwargs)
@pytest.mark.parametrize('org_id', ['', None, 1, 2])
@pytest.mark.parametrize('groups', [['business'], ['user'], ['worker'], []])
def test_non_admin_list_jobs(self, org_id, groups, users, jobs, tasks,
projects, memberships):
# keep the reasonable amount of users and jobs
users = [u for u in users if u['groups'] == groups][:2]
jobs, kwargs = filter_jobs(jobs, tasks, org_id)
org_staff = get_org_staff(org_id, memberships)
for user in users:
user_jobs = []
for job in jobs:
job_staff = get_job_staff(job, tasks, projects)
if user['id'] in job_staff | org_staff:
user_jobs.append(job)
if is_org_member(memberships, user, org_id):
self._test_list_jobs_200(user['username'], user_jobs, **kwargs)
else:
self._test_list_jobs_403(user['username'], **kwargs)
class TestGetAnnotations:
def _test_get_job_annotations_200(self, user, jid, data, **kwargs):
response = get_method(user, f'jobs/{jid}/annotations', **kwargs)
assert response.status_code == HTTPStatus.OK
assert DeepDiff(data, response.json()) == {}
def _test_get_job_annotations_403(self, user, jid, **kwargs):
response = get_method(user, f'jobs/{jid}/annotations', **kwargs)
assert response.status_code == HTTPStatus.FORBIDDEN
@pytest.mark.parametrize('org', [None, '', 1, 2])
def test_admin_get_job_annotations(self, jobs, tasks, annotations, org):
jobs, kwargs = filter_jobs(jobs, tasks, org)
# keep only the reasonable amount of jobs
for job in jobs[:8]:
jid = str(job['id'])
self._test_get_job_annotations_200('admin2', jid,
annotations['job'][jid], **kwargs)
@pytest.mark.parametrize('org_id', ['', None])
@pytest.mark.parametrize('groups', [['business'], ['user'], ['worker'], []])
def test_non_admin_get_job_annotations(self, org_id, groups, users, jobs, tasks,
projects, annotations, memberships):
users = [u for u in users if u['groups'] == groups][:4]
jobs, kwargs = filter_jobs(jobs, tasks, org_id)
org_staff = get_org_staff(org_id, memberships)
# keep only the reasonable amount of jobs
for job in jobs[:8]:
job_staff = get_job_staff(job, tasks, projects)
jid = str(job['id'])
for user in users:
if user['id'] in job_staff | org_staff:
self._test_get_job_annotations_200(user['username'],
jid, annotations['job'][jid], **kwargs)
else:
self._test_get_job_annotations_403(user['username'],
jid, **kwargs)

@ -21,4 +21,7 @@ def delete_method(username, endpoint, **kwargs):
return requests.delete(get_api_url(endpoint, **kwargs), auth=(username, USER_PASS))
def patch_method(username, endpoint, data, **kwargs):
return requests.patch(get_api_url(endpoint, **kwargs), json=data, auth=(username, USER_PASS))
return requests.patch(get_api_url(endpoint, **kwargs), json=data, auth=(username, USER_PASS))
def post_method(username, endpoint, data, **kwargs):
return requests.post(get_api_url(endpoint, **kwargs), json=data, auth=(username, USER_PASS))

@ -2,8 +2,19 @@ import os.path as osp
from config import get_method, ASSETS_DIR
import json
annotations = {}
for obj in ['user', 'project', 'task', 'job', 'organization', 'membership',
'invitation']:
response = get_method('admin1', obj, page_size='all')
response = get_method('admin1', f'{obj}s', page_size='all')
with open(osp.join(ASSETS_DIR, f'{obj}s.json'), 'w') as f:
json.dump(response.json(), f, indent=2, sort_keys=True)
if obj == 'job':
annotations[obj] = {}
for job in response.json()['results']:
jid = job["id"]
response = get_method('admin1', f'jobs/{jid}/annotations')
annotations[obj][jid] = response.json()
with open(osp.join(ASSETS_DIR, f'annotations.json'), 'w') as f:
json.dump(annotations, f, indent=2, sort_keys=True)

Loading…
Cancel
Save