REST API tests for IAM (#4090)
parent
90dcadd479
commit
6c96891fc4
Binary file not shown.
File diff suppressed because one or more lines are too long
Binary file not shown.
@ -0,0 +1,7 @@
|
||||
SELECT pg_terminate_backend(pg_stat_activity.pid)
|
||||
FROM pg_stat_activity
|
||||
WHERE pg_stat_activity.datname = 'cvat' AND pid <> pg_backend_pid();
|
||||
|
||||
DROP DATABASE cvat;
|
||||
|
||||
CREATE DATABASE cvat WITH TEMPLATE test_db;
|
||||
@ -0,0 +1,119 @@
|
||||
# Copyright (C) 2021 Intel Corporation
|
||||
#
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
from subprocess import run, CalledProcessError
|
||||
import pytest
|
||||
import json
|
||||
import os.path as osp
|
||||
from .utils.config import ASSETS_DIR
|
||||
|
||||
def cvat_db_container(command):
|
||||
run(('docker exec cvat_db ' + command).split(), check=True) #nosec
|
||||
|
||||
def docker_cp(source, target):
|
||||
run(' '.join(['docker container cp', source, target]).split(), check=True) #nosec
|
||||
|
||||
def restore_data_volume():
|
||||
command = 'docker run --rm --volumes-from cvat --mount ' \
|
||||
f'type=bind,source={ASSETS_DIR},target=/mnt/ ubuntu tar ' \
|
||||
'--strip 3 -C /home/django/data -xjf /mnt/cvat_data.tar.bz2'
|
||||
run(command.split(), check=True) #nosec
|
||||
|
||||
def drop_test_db():
|
||||
cvat_db_container('pg_restore -c -U root -d cvat /cvat_db/cvat_db.dump')
|
||||
cvat_db_container('rm -rf /cvat_db')
|
||||
cvat_db_container('dropdb test_db')
|
||||
|
||||
def create_test_db():
|
||||
docker_cp(source=osp.join(ASSETS_DIR, 'cvat_db'), target='cvat_db:/')
|
||||
cvat_db_container('createdb test_db')
|
||||
cvat_db_container('pg_restore -U root -d test_db /cvat_db/cvat_db.dump')
|
||||
|
||||
@pytest.fixture(scope='session', autouse=True)
|
||||
def init_test_db():
|
||||
try:
|
||||
restore_data_volume()
|
||||
create_test_db()
|
||||
except CalledProcessError:
|
||||
drop_test_db()
|
||||
pytest.exit(f"Cannot to initialize test DB")
|
||||
|
||||
yield
|
||||
|
||||
drop_test_db()
|
||||
|
||||
@pytest.fixture(scope='function', autouse=True)
|
||||
def restore_cvat_db():
|
||||
cvat_db_container('psql -U root -d postgres -f /cvat_db/cvat_db.sql')
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def users():
|
||||
with open(osp.join(ASSETS_DIR, 'users.json')) as f:
|
||||
return json.load(f)['results']
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def organizations():
|
||||
with open(osp.join(ASSETS_DIR, 'organizations.json')) as f:
|
||||
data = json.load(f)
|
||||
|
||||
def _organizations(org_id=None):
|
||||
if org_id:
|
||||
return [org for org in data if org['id'] == org_id][0]
|
||||
return data
|
||||
|
||||
return _organizations
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def memberships():
|
||||
with open(osp.join(ASSETS_DIR, 'memberships.json')) as f:
|
||||
return json.load(f)['results']
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def users_by_name(users):
|
||||
return {user['username']: user for user in users}
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def find_users(test_db):
|
||||
def find(**kwargs):
|
||||
assert len(kwargs) > 0
|
||||
assert any(kwargs)
|
||||
|
||||
data = test_db
|
||||
kwargs = dict(filter(lambda a: a[1] is not None, kwargs.items()))
|
||||
for field, value in kwargs.items():
|
||||
if field.startswith('exclude_'):
|
||||
field = field.split('_', maxsplit=1)[1]
|
||||
exclude_rows = set(v['id'] for v in
|
||||
filter(lambda a: a[field] == value, test_db))
|
||||
data = list(filter(lambda a: a['id'] not in exclude_rows, data))
|
||||
else:
|
||||
data = list(filter(lambda a: a[field] == value, data))
|
||||
|
||||
return data
|
||||
return find
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def test_db(users, users_by_name, memberships):
|
||||
data = []
|
||||
fields = ['username', 'id', 'privilege', 'role', 'org', 'membership_id']
|
||||
def add_row(**kwargs):
|
||||
data.append({field: kwargs.get(field) for field in fields})
|
||||
|
||||
for user in users:
|
||||
for group in user['groups']:
|
||||
add_row(username=user['username'], id=user['id'], privilege=group)
|
||||
|
||||
for membership in memberships:
|
||||
username = membership['user']['username']
|
||||
for group in users_by_name[username]['groups']:
|
||||
add_row(username=username, role=membership['role'], privilege=group,
|
||||
id=membership['user']['id'], org=membership['organization'],
|
||||
membership_id=membership['id'])
|
||||
|
||||
return data
|
||||
|
||||
|
||||
|
||||
|
||||
@ -0,0 +1,52 @@
|
||||
# Copyright (C) 2021 Intel Corporation
|
||||
#
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
from http import HTTPStatus
|
||||
from deepdiff import DeepDiff
|
||||
|
||||
from .utils.config import get_method
|
||||
|
||||
class TestGetUsers:
|
||||
def _test_can_see(self, user, data, endpoint='users', exclude_paths='', **kwargs):
|
||||
response = get_method(user, endpoint, **kwargs)
|
||||
response_data = response.json()
|
||||
response_data = response_data.get('results', response_data)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert DeepDiff(data, response_data, ignore_order=True,
|
||||
exclude_paths=exclude_paths) == {}
|
||||
|
||||
def _test_cannot_see(self, user, endpoint='users', **kwargs):
|
||||
response = get_method(user, endpoint, **kwargs)
|
||||
|
||||
assert response.status_code == HTTPStatus.FORBIDDEN
|
||||
|
||||
def test_admin_can_see_all_others(self, users):
|
||||
exclude_paths = [f"root[{i}]['last_login']" for i in range(len(users))]
|
||||
self._test_can_see('admin2', users, exclude_paths=exclude_paths,
|
||||
page_size="all")
|
||||
|
||||
def test_everybody_can_see_self(self, users_by_name):
|
||||
for user, data in users_by_name.items():
|
||||
self._test_can_see(user, data, "users/self", "root['last_login']")
|
||||
|
||||
def test_non_members_cannot_see_list_of_members(self):
|
||||
self._test_cannot_see('user2', org='org1')
|
||||
|
||||
def test_non_admin_cannot_see_others(self, users):
|
||||
non_admins = (v for v in users if not v['is_superuser'])
|
||||
user = next(non_admins)['username']
|
||||
user_id = next(non_admins)['id']
|
||||
|
||||
self._test_cannot_see(user, f"users/{user_id}")
|
||||
|
||||
def test_all_members_can_see_list_of_members(self, find_users, users):
|
||||
org_members = [user['username'] for user in find_users(org=1)]
|
||||
available_fields = ['url', 'id', 'username', 'first_name', 'last_name']
|
||||
|
||||
data = [dict(filter(lambda row: row[0] in available_fields, user.items()))
|
||||
for user in users if user['username'] in org_members]
|
||||
|
||||
for member in org_members:
|
||||
self._test_can_see(member, data, org='org1')
|
||||
@ -1,35 +0,0 @@
|
||||
# Copyright (C) 2021 Intel Corporation
|
||||
#
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
import os
|
||||
from http import HTTPStatus
|
||||
import requests
|
||||
import json
|
||||
from .utils import config
|
||||
from deepdiff import DeepDiff
|
||||
|
||||
|
||||
def test_non_admin_cannot_see_others():
|
||||
for username in ['dummy1', 'worker1', 'user1', 'business1']:
|
||||
response = requests.get(config.get_api_url('users'), auth=(username, config.USER_PASS))
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()['count'] == 1
|
||||
|
||||
def test_admin_can_see_all_others():
|
||||
response = requests.get(config.get_api_url('users'), auth=('admin2', config.USER_PASS))
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
with open(os.path.join(config.ASSETS_DIR, 'users.json')) as f:
|
||||
data = json.load(f)
|
||||
assert response.json()['count'] == data['count']
|
||||
|
||||
def test_everybody_can_see_self():
|
||||
with open(os.path.join(config.ASSETS_DIR, 'users.json')) as f:
|
||||
data = json.load(f)['results']
|
||||
users = {user['username']:user for user in data}
|
||||
|
||||
for username in ['dummy1', 'worker1', 'user1', 'business1', 'admin1']:
|
||||
response = requests.get(config.get_api_url('users/self'), auth=(username, config.USER_PASS))
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert DeepDiff(users[username], response.json(), ignore_order=True,
|
||||
exclude_paths="root['last_login']") == {}
|
||||
@ -0,0 +1,79 @@
|
||||
# Copyright (C) 2021 Intel Corporation
|
||||
#
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
import pytest
|
||||
from http import HTTPStatus
|
||||
from deepdiff import DeepDiff
|
||||
|
||||
from .utils.config import get_method, patch_method
|
||||
|
||||
class TestGetMemberships:
|
||||
def _test_can_see_memberships(self, user, data, **kwargs):
|
||||
response = get_method(user, 'memberships', **kwargs)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert DeepDiff(data, response.json()['results']) == {}
|
||||
|
||||
def _test_cannot_see_memberships(self, user, **kwargs):
|
||||
response = get_method(user, 'memberships', **kwargs)
|
||||
|
||||
assert response.status_code == HTTPStatus.FORBIDDEN
|
||||
|
||||
def test_admin_can_see_all_memberships(self, memberships):
|
||||
self._test_can_see_memberships('admin2', memberships, page_size='all')
|
||||
|
||||
def test_non_admin_can_see_only_self_memberships(self, memberships):
|
||||
non_admins= ['business1', 'user1', 'dummy1','worker2']
|
||||
for user in non_admins:
|
||||
data = [m for m in memberships if m['user']['username'] == user]
|
||||
self._test_can_see_memberships(user, data)
|
||||
|
||||
def test_all_members_can_see_other_members_membership(self, memberships):
|
||||
data = [m for m in memberships if m['organization'] == 1]
|
||||
for membership in data:
|
||||
self._test_can_see_memberships(membership['user']['username'],
|
||||
data, org_id=1)
|
||||
|
||||
def test_non_members_cannot_see_members_membership(self):
|
||||
non_org1_users = ['user2', 'worker3']
|
||||
for user in non_org1_users:
|
||||
self._test_cannot_see_memberships(user, org_id=1)
|
||||
|
||||
|
||||
class TestPatchMemberships:
|
||||
_ORG = 2
|
||||
|
||||
def _test_can_change_membership(self, user, membership_id, new_role):
|
||||
response = patch_method(user, f"memberships/{membership_id}",
|
||||
{'role': new_role}, org_id=self._ORG)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()['role'] == new_role
|
||||
|
||||
def _test_cannot_change_membership(self, user, membership_id, new_role):
|
||||
response = patch_method(user, f"memberships/{membership_id}",
|
||||
{'role': new_role}, org_id=self._ORG)
|
||||
|
||||
assert response.status_code == HTTPStatus.FORBIDDEN
|
||||
|
||||
@pytest.mark.parametrize('who, whom, new_role, is_allow', [
|
||||
('supervisor', 'worker', 'supervisor', False),
|
||||
('supervisor', 'maintainer', 'supervisor', False),
|
||||
('worker', 'supervisor', 'worker', False),
|
||||
('worker', 'maintainer', 'worker', False),
|
||||
('maintainer', 'maintainer', 'worker', False),
|
||||
('maintainer', 'supervisor', 'worker', True),
|
||||
('maintainer', 'worker', 'supervisor', True),
|
||||
('owner', 'maintainer', 'worker', True),
|
||||
('owner', 'supervisor', 'worker', True),
|
||||
('owner', 'worker', 'supervisor', True),
|
||||
])
|
||||
def test_user_can_change_role_of_member(self, who, whom, new_role, is_allow, find_users):
|
||||
user = find_users(org=self._ORG, role=who)[0]['username']
|
||||
membership_id = find_users(org=self._ORG, role=whom)[1]['membership_id']
|
||||
|
||||
if is_allow:
|
||||
self._test_can_change_membership(user, membership_id, new_role)
|
||||
else:
|
||||
self._test_cannot_change_membership(user, membership_id, new_role)
|
||||
@ -1,14 +1,10 @@
|
||||
import os
|
||||
import requests
|
||||
import os.path as osp
|
||||
from config import get_method, ASSETS_DIR
|
||||
import json
|
||||
import config
|
||||
|
||||
with requests.Session() as session:
|
||||
session.auth = ('admin1', config.USER_PASS)
|
||||
|
||||
for obj in ['user', 'project', 'task', 'job', 'organization', 'membership',
|
||||
'invitation']:
|
||||
response = session.get(f'http://localhost:8080/api/v1/{obj}s?page_size=all')
|
||||
with open(os.path.join(config.ASSETS_DIR, f'{obj}s.json'), 'w') as f:
|
||||
json.dump(response.json(), f, indent=2, sort_keys=True)
|
||||
for obj in ['user', 'project', 'task', 'job', 'organization', 'membership',
|
||||
'invitation']:
|
||||
response = get_method('admin1', obj, page_size='all')
|
||||
with open(osp.join(ASSETS_DIR, f'{obj}s.json'), 'w') as f:
|
||||
json.dump(response.json(), f, indent=2, sort_keys=True)
|
||||
|
||||
|
||||
Loading…
Reference in New Issue