Fix an issue with permissions (observer can change annotations) (#745)

* Fixed a problem with observer (check_object_permissions method was not called)
* Added a test case to cover issue #712.
main
Nikita Manovich 6 years ago committed by GitHub
parent db19cbfe4b
commit df4589b52e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1653,6 +1653,29 @@ class JobAnnotationAPITestCase(APITestCase):
self._run_api_v1_jobs_id_annotations(self.user, self.assignee, self._run_api_v1_jobs_id_annotations(self.user, self.assignee,
self.assignee) self.assignee)
def test_api_v1_jobs_id_annotations_observer(self):
_, jobs = self._create_task(self.user, self.assignee)
job = jobs[0]
data = {
"version": 0,
"tags": [],
"shapes": [],
"tracks": []
}
response = self._get_api_v1_jobs_id_data(job["id"], self.observer)
self.assertEqual(response.status_code, status.HTTP_200_OK)
response = self._put_api_v1_jobs_id_data(job["id"], self.observer, data)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
response = self._patch_api_v1_jobs_id_data(job["id"], self.observer, "create", data)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
response = self._delete_api_v1_jobs_id_data(job["id"], self.observer)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_api_v1_jobs_id_annotations_no_auth(self): def test_api_v1_jobs_id_annotations_no_auth(self):
self._run_api_v1_jobs_id_annotations(self.user, self.assignee, None) self._run_api_v1_jobs_id_annotations(self.user, self.assignee, None)

@ -187,7 +187,7 @@ class TaskViewSet(auth.TaskGetQuerySetMixin, viewsets.ModelViewSet):
permissions.append(auth.TaskAccessPermission) permissions.append(auth.TaskAccessPermission)
elif http_method in ["POST"]: elif http_method in ["POST"]:
permissions.append(auth.TaskCreatePermission) permissions.append(auth.TaskCreatePermission)
elif http_method in ["PATCH", "PUT"]: elif self.action == 'annotations' or http_method in ["PATCH", "PUT"]:
permissions.append(auth.TaskChangePermission) permissions.append(auth.TaskChangePermission)
elif http_method in ["DELETE"]: elif http_method in ["DELETE"]:
permissions.append(auth.TaskDeletePermission) permissions.append(auth.TaskDeletePermission)
@ -207,9 +207,9 @@ class TaskViewSet(auth.TaskGetQuerySetMixin, viewsets.ModelViewSet):
super().perform_destroy(instance) super().perform_destroy(instance)
shutil.rmtree(task_dirname, ignore_errors=True) shutil.rmtree(task_dirname, ignore_errors=True)
@staticmethod
@action(detail=True, methods=['GET'], serializer_class=JobSerializer) @action(detail=True, methods=['GET'], serializer_class=JobSerializer)
def jobs(request, pk): def jobs(self, request, pk):
self.get_object() # force to call check_object_permissions
queryset = Job.objects.filter(segment__task_id=pk) queryset = Job.objects.filter(segment__task_id=pk)
serializer = JobSerializer(queryset, many=True, serializer = JobSerializer(queryset, many=True,
context={"request": request}) context={"request": request})
@ -218,7 +218,7 @@ class TaskViewSet(auth.TaskGetQuerySetMixin, viewsets.ModelViewSet):
@action(detail=True, methods=['POST'], serializer_class=TaskDataSerializer) @action(detail=True, methods=['POST'], serializer_class=TaskDataSerializer)
def data(self, request, pk): def data(self, request, pk):
db_task = self.get_object() db_task = self.get_object() # call check_object_permissions as well
serializer = TaskDataSerializer(db_task, data=request.data) serializer = TaskDataSerializer(db_task, data=request.data)
if serializer.is_valid(raise_exception=True): if serializer.is_valid(raise_exception=True):
serializer.save() serializer.save()
@ -228,6 +228,7 @@ class TaskViewSet(auth.TaskGetQuerySetMixin, viewsets.ModelViewSet):
@action(detail=True, methods=['GET', 'DELETE', 'PUT', 'PATCH'], @action(detail=True, methods=['GET', 'DELETE', 'PUT', 'PATCH'],
serializer_class=LabeledDataSerializer) serializer_class=LabeledDataSerializer)
def annotations(self, request, pk): def annotations(self, request, pk):
self.get_object() # force to call check_object_permissions
if request.method == 'GET': if request.method == 'GET':
data = annotation.get_task_data(pk, request.user) data = annotation.get_task_data(pk, request.user)
serializer = LabeledDataSerializer(data=data) serializer = LabeledDataSerializer(data=data)
@ -267,7 +268,7 @@ class TaskViewSet(auth.TaskGetQuerySetMixin, viewsets.ModelViewSet):
def dump(self, request, pk, filename): def dump(self, request, pk, filename):
filename = re.sub(r'[\\/*?:"<>|]', '_', filename) filename = re.sub(r'[\\/*?:"<>|]', '_', filename)
username = request.user.username username = request.user.username
db_task = self.get_object() db_task = self.get_object() # call check_object_permissions as well
timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M_%S") timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
action = request.query_params.get("action") action = request.query_params.get("action")
if action not in [None, "download"]: if action not in [None, "download"]:
@ -325,6 +326,7 @@ class TaskViewSet(auth.TaskGetQuerySetMixin, viewsets.ModelViewSet):
@action(detail=True, methods=['GET'], serializer_class=RqStatusSerializer) @action(detail=True, methods=['GET'], serializer_class=RqStatusSerializer)
def status(self, request, pk): def status(self, request, pk):
self.get_object() # force to call check_object_permissions
response = self._get_rq_response(queue="default", response = self._get_rq_response(queue="default",
job_id="/api/{}/tasks/{}".format(request.version, pk)) job_id="/api/{}/tasks/{}".format(request.version, pk))
serializer = RqStatusSerializer(data=response) serializer = RqStatusSerializer(data=response)
@ -350,12 +352,11 @@ class TaskViewSet(auth.TaskGetQuerySetMixin, viewsets.ModelViewSet):
return response return response
@staticmethod
@action(detail=True, methods=['GET'], serializer_class=ImageMetaSerializer, @action(detail=True, methods=['GET'], serializer_class=ImageMetaSerializer,
url_path='frames/meta') url_path='frames/meta')
def data_info(request, pk): def data_info(self, request, pk):
try: try:
db_task = models.Task.objects.get(pk=pk) db_task = self.get_object() # call check_object_permissions as well
meta_cache_file = open(db_task.get_image_meta_cache_path()) meta_cache_file = open(db_task.get_image_meta_cache_path())
except OSError: except OSError:
task.make_image_meta_cache(db_task) task.make_image_meta_cache(db_task)
@ -404,6 +405,7 @@ class JobViewSet(viewsets.GenericViewSet,
@action(detail=True, methods=['GET', 'DELETE', 'PUT', 'PATCH'], @action(detail=True, methods=['GET', 'DELETE', 'PUT', 'PATCH'],
serializer_class=LabeledDataSerializer) serializer_class=LabeledDataSerializer)
def annotations(self, request, pk): def annotations(self, request, pk):
self.get_object() # force to call check_object_permissions
if request.method == 'GET': if request.method == 'GET':
data = annotation.get_job_data(pk, request.user) data = annotation.get_job_data(pk, request.user)
return Response(data) return Response(data)

Loading…
Cancel
Save