Analytics v2 (#127)

* Add revproxy dependency
* Fix formating of task.log, improve fields of server events.
* Mount kibana app into /analytics/*
* Add logs for revproxy and x-forwarded-user header.
* Added F3 shortcut for analytics/log-viewer.
main
Nikita Manovich 7 years ago committed by GitHub
parent c5550899db
commit 45af7bdee3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -26,8 +26,6 @@ services:
context: ./analytics/kibana
args:
ELK_VERSION: 6.4.0
ports:
- "5601:5601"
depends_on: ['cvat_elasticsearch']
restart: always
@ -61,9 +59,11 @@ services:
cvat:
environment:
DJANGO_LOG_SERVER_HOST: "logstash"
DJANGO_LOG_SERVER_HOST: logstash
DJANGO_LOG_SERVER_PORT: 5000
no_proxy: logstash,${no_proxy}
DJANGO_LOG_VIEWER_HOST: kibana
DJANGO_LOG_VIEWER_PORT: 5601
no_proxy: kibana,logstash,${no_proxy}
volumes:
cvat_events:

@ -1,3 +1,4 @@
ARG ELK_VERSION
FROM docker.elastic.co/elasticsearch/elasticsearch-oss:${ELK_VERSION}
COPY --chown=elasticsearch:elasticsearch elasticsearch.yml /usr/share/elasticsearch/config/

@ -1,4 +1,5 @@
server.host: 0.0.0.0
elasticsearch.url: http://elasticsearch:9200
elasticsearch.requestHeadersWhitelist: [ cookie, authorization, x-forwarded-user ]
elasticsearch.requestHeadersWhitelist: [ "cookie", "authorization", "x-forwarded-user" ]
kibana.defaultAppId: "discover"
server.basePath: /analytics

@ -54,10 +54,24 @@ filter {
replace => { "type" => "client" }
}
} else if [logger_name] =~ /cvat.server/ {
# 1. Remove unnecessary field from it
# 2. Type it as server
# 1. Remove 'logger_name' field and create 'task' field
# 2. Remove unnecessary field from it
# 3. Type it as server
if [logger_name] =~ /cvat\.server\.task_[0-9]+/ {
mutate {
rename => { "logger_name" => "task" }
gsub => [ "task", "cvat.server.task_", "" ]
}
# Need to split the mutate because otherwise the conversion
# doesn't work.
mutate {
convert => { "task" => "integer" }
}
}
prune {
blacklist_names => ["host", "port"]
blacklist_names => ["host", "port", "stack_info"]
}
mutate {

@ -5,7 +5,7 @@
*/
Mousetrap.bind(window.cvat.config.shortkeys["open_help"].value, function() {
window.location.href = "/documentation/user_guide.html";
window.open("/documentation/user_guide.html");
return false;
});

@ -22,7 +22,7 @@ from django.db import transaction
from . import models
from .task import get_frame_path, get_image_meta_cache
from .logging import task_logger, job_logger
from .log import slogger
############################# Low Level server API
@ -114,7 +114,7 @@ def save_task(tid, data):
# pylint: disable=unused-argument
def rq_handler(job, exc_type, exc_value, traceback):
tid = job.id.split('/')[1]
task_logger[tid].error("dump annotation error was occured", exc_info=True)
slogger.task[tid].error("dump annotation error was occured", exc_info=True)
##################################################
@ -407,7 +407,7 @@ class _AnnotationForJob(_Annotation):
# pylint: disable=bad-continuation
self.db_job = db_job
self.logger = job_logger[db_job.id]
self.logger = slogger.job[db_job.id]
self.db_labels = {db_label.id:db_label
for db_label in db_job.segment.task.label_set.all()}
self.db_attributes = {db_attr.id:db_attr

@ -4,9 +4,8 @@
import os
import logging
from . import models
from cvat.settings.base import LOGGING
from cvat.apps.engine.models import Job, Task
from .models import Job, Task
def _get_task(tid):
try:
@ -16,7 +15,7 @@ def _get_task(tid):
def _get_job(jid):
try:
return models.Job.objects.select_related("segment__task").get(id=jid)
return Job.objects.select_related("segment__task").get(id=jid)
except Exception:
raise Exception('{} key must be a job identifier'.format(jid))
@ -34,6 +33,8 @@ class TaskLoggerStorage:
logger = logging.getLogger('cvat.server.task_{}'.format(tid))
server_file = logging.FileHandler(filename=task.get_log_path())
formatter = logging.Formatter(LOGGING['formatters']['standard']['format'])
server_file.setFormatter(formatter)
logger.addHandler(server_file)
return logger
@ -49,7 +50,7 @@ class JobLoggerStorage:
def _get_task_logger(self, jid):
job = _get_job(jid)
return task_logger[job.segment.task.id]
return slogger.task[job.segment.task.id]
class TaskClientLoggerStorage:
def __init__(self):
@ -79,10 +80,21 @@ class JobClientLoggerStorage:
def _get_task_logger(self, jid):
job = _get_job(jid)
return task_client_logger[job.segment.task.id]
task_logger = TaskLoggerStorage()
job_logger = JobLoggerStorage()
global_logger = logging.getLogger('cvat.server')
job_client_logger = JobClientLoggerStorage()
task_client_logger = TaskClientLoggerStorage()
return clogger.task[job.segment.task.id]
class dotdict(dict):
"""dot.notation access to dictionary attributes"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
clogger = dotdict({
'task': TaskClientLoggerStorage(),
'job': JobClientLoggerStorage()
})
slogger = dotdict({
'task': TaskLoggerStorage(),
'job': JobLoggerStorage(),
'glob': logging.getLogger('cvat.server'),
})

@ -192,6 +192,12 @@ class Config {
description: "open settings window "
},
open_analytics: {
value: "f3",
view_value: "F3",
description: "open analytics window"
},
save_work: {
value: "ctrl+s",
view_value: "Ctrl + S",

@ -3,16 +3,12 @@
#
# SPDX-License-Identifier: MIT
import csv
import os
import re
import rq
import sys
import rq
import shlex
import logging
import shutil
import tempfile
from io import StringIO
from PIL import Image
from traceback import print_exception
from ast import literal_eval
@ -30,7 +26,7 @@ from pyunpack import Archive
from distutils.dir_util import copy_tree
from . import models
from .logging import task_logger, job_logger, global_logger
from .log import slogger
############################# Low Level server API
@ -579,7 +575,8 @@ def _save_task_to_db(db_task, task_params):
for x in range(0, db_task.size, segment_step):
start_frame = x
stop_frame = min(x + task_params['segment'] - 1, db_task.size - 1)
global_logger.info("New segment for task #{}: start_frame = {}, stop_frame = {}".format(db_task.id, start_frame, stop_frame))
slogger.glob.info("New segment for task #{}: start_frame = {}, \
stop_frame = {}".format(db_task.id, start_frame, stop_frame))
db_segment = models.Segment()
db_segment.task = db_task
@ -613,7 +610,7 @@ def _create_thread(tid, params):
raise Exception('Only one archive, one video or many images can be dowloaded simultaneously. \
{} image(s), {} dir(s), {} video(s), {} archive(s) found'.format(images, dirs, videos, archives))
global_logger.info("create task #{}".format(tid))
slogger.glob.info("create task #{}".format(tid))
job = rq.get_current_job()
db_task = models.Task.objects.select_for_update().get(pk=tid)
@ -657,13 +654,13 @@ def _create_thread(tid, params):
}
task_params['overlap'] = int(params.get('overlap_size', 5 if task_params['mode'] == 'interpolation' else 0))
task_params['overlap'] = min(task_params['overlap'], task_params['segment'] - 1)
global_logger.info("Task #{} parameters: {}".format(tid, task_params))
slogger.glob.info("Task #{} parameters: {}".format(tid, task_params))
if task_params['mode'] == 'interpolation':
_find_and_extract_video(upload_dir, output_dir, db_task, task_params['compress'], task_params['flip'], job)
else:
_find_and_compress_images(upload_dir, output_dir, db_task, task_params['compress'], task_params['flip'], job)
global_logger.info("Founded frames {} for task #{}".format(db_task.size, tid))
slogger.glob.info("Founded frames {} for task #{}".format(db_task.size, tid))
job.meta['status'] = 'Task is being saved in database'
job.save_meta()

@ -9,7 +9,7 @@ from . import views
urlpatterns = [
path('', views.dispatch_request),
path('create/task', views.create_task),
path('get/task/<str:tid>/frame/<int:frame>', views.get_frame),
path('get/task/<int:tid>/frame/<int:frame>', views.get_frame),
path('check/task/<int:tid>', views.check_task),
path('delete/task/<int:tid>', views.delete_task),
path('update/task/<int:tid>', views.update_task),

@ -19,7 +19,7 @@ from cvat.settings.base import JS_3RDPARTY
from cvat.apps.authentication.decorators import login_required
from requests.exceptions import RequestException
import logging
from .logging import task_logger, job_logger, global_logger, job_client_logger
from .log import slogger, clogger
############################# High Level server API
@login_required
@ -27,7 +27,7 @@ from .logging import task_logger, job_logger, global_logger, job_client_logger
def catch_client_exception(request, jid):
data = json.loads(request.body.decode('utf-8'))
for event in data['exceptions']:
job_client_logger[jid].error(json.dumps(event))
clogger.job[jid].error(json.dumps(event))
return HttpResponse()
@ -49,7 +49,7 @@ def create_task(request):
db_task = None
params = request.POST.dict()
params['owner'] = request.user
global_logger.info("create task with params = {}".format(params))
slogger.glob.info("create task with params = {}".format(params))
try:
db_task = task.create_empty(params)
target_paths = []
@ -94,7 +94,7 @@ def create_task(request):
return JsonResponse({'tid': db_task.id})
except Exception as exc:
global_logger.error("cannot create task {}".format(params['task_name']), exc_info=True)
slogger.glob.error("cannot create task {}".format(params['task_name']), exc_info=True)
db_task.delete()
return HttpResponseBadRequest(str(exc))
@ -106,10 +106,10 @@ def check_task(request, tid):
"""Check the status of a task"""
try:
global_logger.info("check task #{}".format(tid))
slogger.glob.info("check task #{}".format(tid))
response = task.check(tid)
except Exception as e:
global_logger.error("cannot check task #{}".format(tid), exc_info=True)
slogger.glob.error("cannot check task #{}".format(tid), exc_info=True)
return HttpResponseBadRequest(str(e))
return JsonResponse(response)
@ -125,7 +125,7 @@ def get_frame(request, tid, frame):
path = os.path.realpath(task.get_frame_path(tid, frame))
return sendfile(request, path)
except Exception as e:
task_logger[tid].error("cannot get frame #{}".format(frame), exc_info=True)
slogger.task[tid].error("cannot get frame #{}".format(frame), exc_info=True)
return HttpResponseBadRequest(str(e))
@login_required
@ -133,13 +133,13 @@ def get_frame(request, tid, frame):
def delete_task(request, tid):
"""Delete the task"""
try:
global_logger.info("delete task #{}".format(tid))
slogger.glob.info("delete task #{}".format(tid))
if not task.is_task_owner(request.user, tid):
return HttpResponseBadRequest("You don't have permissions to delete the task.")
task.delete(tid)
except Exception as e:
global_logger.error("cannot delete task #{}".format(tid), exc_info=True)
slogger.glob.error("cannot delete task #{}".format(tid), exc_info=True)
return HttpResponseBadRequest(str(e))
return HttpResponse()
@ -149,14 +149,14 @@ def delete_task(request, tid):
def update_task(request, tid):
"""Update labels for the task"""
try:
task_logger[tid].info("update task request")
slogger.task[tid].info("update task request")
if not task.is_task_owner(request.user, tid):
return HttpResponseBadRequest("You don't have permissions to change the task.")
labels = request.POST['labels']
task.update(tid, labels)
except Exception as e:
task_logger[tid].error("cannot update task", exc_info=True)
slogger.task[tid].error("cannot update task", exc_info=True)
return HttpResponseBadRequest(str(e))
return HttpResponse()
@ -165,10 +165,10 @@ def update_task(request, tid):
@permission_required(perm='engine.view_task', raise_exception=True)
def get_task(request, tid):
try:
task_logger[tid].info("get task request")
slogger.task[tid].info("get task request")
response = task.get(tid)
except Exception as e:
task_logger[tid].error("cannot get task", exc_info=True)
slogger.task[tid].error("cannot get task", exc_info=True)
return HttpResponseBadRequest(str(e))
return JsonResponse(response, safe=False)
@ -177,10 +177,10 @@ def get_task(request, tid):
@permission_required(perm=['engine.view_task', 'engine.view_annotation'], raise_exception=True)
def get_job(request, jid):
try:
job_logger[jid].info("get job #{} request".format(jid))
slogger.job[jid].info("get job #{} request".format(jid))
response = task.get_job(jid)
except Exception as e:
job_logger[jid].error("cannot get job #{}".format(jid), exc_info=True)
slogger.job[jid].error("cannot get job #{}".format(jid), exc_info=True)
return HttpResponseBadRequest(str(e))
return JsonResponse(response, safe=False)
@ -189,10 +189,10 @@ def get_job(request, jid):
@permission_required(perm=['engine.view_task', 'engine.view_annotation'], raise_exception=True)
def dump_annotation(request, tid):
try:
task_logger[tid].info("dump annotation request")
slogger.task[tid].info("dump annotation request")
annotation.dump(tid, annotation.FORMAT_XML, request.scheme, request.get_host())
except Exception as e:
task_logger[tid].error("cannot dump annotation", exc_info=True)
slogger.task[tid].error("cannot dump annotation", exc_info=True)
return HttpResponseBadRequest(str(e))
return HttpResponse()
@ -202,10 +202,10 @@ def dump_annotation(request, tid):
@permission_required(perm=['engine.view_task', 'engine.view_annotation'], raise_exception=True)
def check_annotation(request, tid):
try:
task_logger[tid].info("check annotation")
slogger.task[tid].info("check annotation")
response = annotation.check(tid)
except Exception as e:
task_logger[tid].error("cannot check annotation", exc_info=True)
slogger.task[tid].error("cannot check annotation", exc_info=True)
return HttpResponseBadRequest(str(e))
return JsonResponse(response)
@ -216,12 +216,12 @@ def check_annotation(request, tid):
@permission_required(perm=['engine.view_task', 'engine.view_annotation'], raise_exception=True)
def download_annotation(request, tid):
try:
task_logger[tid].info("get dumped annotation")
slogger.task[tid].info("get dumped annotation")
db_task = models.Task.objects.get(pk=tid)
response = sendfile(request, db_task.get_dump_path(), attachment=True,
attachment_filename='{}_{}.xml'.format(db_task.id, db_task.name))
except Exception as e:
task_logger[tid].error("cannot get dumped annotation", exc_info=True)
slogger.task[tid].error("cannot get dumped annotation", exc_info=True)
return HttpResponseBadRequest(str(e))
return response
@ -232,10 +232,10 @@ def download_annotation(request, tid):
@permission_required(perm=['engine.view_task', 'engine.view_annotation'], raise_exception=True)
def get_annotation(request, jid):
try:
job_logger[jid].info("get annotation for {} job".format(jid))
slogger.job[jid].info("get annotation for {} job".format(jid))
response = annotation.get(jid)
except Exception as e:
job_logger[jid].error("cannot get annotation for job {}".format(jid), exc_info=True)
slogger.job[jid].error("cannot get annotation for job {}".format(jid), exc_info=True)
return HttpResponseBadRequest(str(e))
return JsonResponse(response, safe=False)
@ -244,18 +244,18 @@ def get_annotation(request, jid):
@permission_required(perm=['engine.view_task', 'engine.change_annotation'], raise_exception=True)
def save_annotation_for_job(request, jid):
try:
job_logger[jid].info("save annotation for {} job".format(jid))
slogger.job[jid].info("save annotation for {} job".format(jid))
data = json.loads(request.body.decode('utf-8'))
if 'annotation' in data:
annotation.save_job(jid, json.loads(data['annotation']))
if 'logs' in data:
for event in json.loads(data['logs']):
job_client_logger[jid].info(json.dumps(event))
clogger.job[jid].info(json.dumps(event))
except RequestException as e:
job_logger[jid].error("cannot send annotation logs for job {}".format(jid), exc_info=True)
slogger.job[jid].error("cannot send annotation logs for job {}".format(jid), exc_info=True)
return HttpResponseBadRequest(str(e))
except Exception as e:
job_logger[jid].error("cannot save annotation for job {}".format(jid), exc_info=True)
slogger.job[jid].error("cannot save annotation for job {}".format(jid), exc_info=True)
return HttpResponseBadRequest(str(e))
return HttpResponse()
@ -265,11 +265,11 @@ def save_annotation_for_job(request, jid):
@permission_required(perm=['engine.view_task', 'engine.change_annotation'], raise_exception=True)
def save_annotation_for_task(request, tid):
try:
task_logger[tid].info("save annotation request")
slogger.task[tid].info("save annotation request")
data = json.loads(request.body.decode('utf-8'))
annotation.save_task(tid, data)
except Exception as e:
task_logger[tid].error("cannot save annotation", exc_info=True)
slogger.task[tid].error("cannot save annotation", exc_info=True)
return HttpResponseBadRequest(str(e))
return HttpResponse()

@ -0,0 +1,7 @@
# Copyright (C) 2018 Intel Corporation
#
# SPDX-License-Identifier: MIT
from cvat.settings.base import JS_3RDPARTY
JS_3RDPARTY['dashboard'] = JS_3RDPARTY.get('dashboard', []) + ['log_viewer/js/shortcuts.js']

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

@ -0,0 +1,5 @@
from django.apps import AppConfig
class LogViewerConfig(AppConfig):
name = 'log_viewer'

@ -0,0 +1,3 @@
from django.db import models
# Create your models here.

@ -0,0 +1,11 @@
/*
* Copyright (C) 2018 Intel Corporation
*
* SPDX-License-Identifier: MIT
*/
Mousetrap.bind(window.cvat.config.shortkeys["open_analytics"].value, function() {
window.open("/analytics/app/kibana");
return false;
});

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

@ -0,0 +1,12 @@
# Copyright (C) 2018 Intel Corporation
#
# SPDX-License-Identifier: MIT
from django.urls import path
from . import views
urlpatterns = [
path('<path:path>', views.LogViewerProxy.as_view())
]

@ -0,0 +1,16 @@
import os
from revproxy.views import ProxyView
from cvat.apps.authentication.decorators import login_required
from django.utils.decorators import method_decorator
@method_decorator(login_required, name='dispatch')
class LogViewerProxy(ProxyView):
upstream = 'http://{}:{}'.format(os.getenv('DJANGO_LOG_VIEWER_HOST'),
os.getenv('DJANGO_LOG_VIEWER_PORT'))
add_remote_user = True
def get_request_headers(self):
headers = super().get_request_headers()
headers['X-Forwarded-User'] = headers['REMOTE_USER']
return headers

@ -21,8 +21,7 @@ import rq
import tensorflow as tf
import numpy as np
from PIL import Image
_logger = logging.getLogger(__name__)
from .log import slogger
def load_image_into_numpy(image):
(im_width, im_height) = image.size
@ -153,7 +152,7 @@ def create_thread(id, labels_mapping):
# Run auto annotation by tf
result = run_annotation(image_list, labels_mapping, TRESHOLD)
if result is None:
_logger.info('tf annotation for task {} canceled by user'.format(id))
slogger.glob.info('tf annotation for task {} canceled by user'.format(id))
return
# Modify data format and save
@ -161,30 +160,30 @@ def create_thread(id, labels_mapping):
annotation.save_task(id, result)
db_task.status = "Annotation"
db_task.save()
_logger.info('tf annotation for task {} done'.format(id))
slogger.glob.info('tf annotation for task {} done'.format(id))
except Exception:
_logger.exception('exception was occured during tf annotation of the task {}'.format(id))
slogger.glob.exception('exception was occured during tf annotation of the task {}'.format(id))
db_task.status = "TF Annotation Fault"
db_task.save()
@login_required
@permission_required(perm=['engine.view_task', 'engine.change_annotation'], raise_exception=True)
def create(request, tid):
_logger.info('tf annotation create request for task {}'.format(tid))
slogger.glob.info('tf annotation create request for task {}'.format(tid))
try:
db_task = TaskModel.objects.get(pk=tid)
except ObjectDoesNotExist:
_logger.exception('task with id {} not found'.format(tid))
slogger.glob.exception('task with id {} not found'.format(tid))
return HttpResponseBadRequest("A task with this ID was not found")
if not task.is_task_owner(request.user, tid):
_logger.error('not enought of permissions for tf annotation of the task {}'.format(tid))
slogger.glob.error('not enought of permissions for tf annotation of the task {}'.format(tid))
return HttpResponseBadRequest("You don't have permissions to tf annotation of the task.")
queue = django_rq.get_queue('low')
job = queue.fetch_job('tf_annotation.create/{}'.format(tid))
if job is not None and (job.is_started or job.is_queued):
_logger.error('tf annotation for task {} already running'.format(tid))
slogger.glob.error('tf annotation for task {} already running'.format(tid))
return HttpResponseBadRequest("The process is already running")
db_labels = db_task.label_set.prefetch_related('attributespec_set').all()
db_labels = {db_label.id:db_label.name for db_label in db_labels}
@ -214,7 +213,7 @@ def create(request, tid):
labels_mapping[tf_annotation_labels[labels]] = key
if not len(labels_mapping.values()):
_logger.error('no labels found for task {} tf annotation'.format(tid))
slogger.glob.error('no labels found for task {} tf annotation'.format(tid))
return HttpResponseBadRequest("No labels found for tf annotation")
db_task.status = "TF Annotation"
@ -225,7 +224,7 @@ def create(request, tid):
args=(tid, labels_mapping),
job_id='tf_annotation.create/{}'.format(tid),
timeout=604800) # 7 days
_logger.info('tf annotation job enqueued for task {} with labels {}'.format(tid, labels_mapping))
slogger.glob.info('tf annotation job enqueued for task {} with labels {}'.format(tid, labels_mapping))
return HttpResponse()

@ -23,3 +23,4 @@ sqlparse==0.2.4
django-sendfile==0.3.11
dj-pagination==2.3.2
python-logstash==0.4.6
django-revproxy==0.9.15

@ -54,11 +54,15 @@ INSTALLED_APPS = [
'cacheops',
'sendfile',
'dj_pagination',
'revproxy'
]
if 'yes' == os.environ.get('TF_ANNOTATION', 'no'):
INSTALLED_APPS += ['cvat.apps.tf_annotation']
if os.getenv('DJANGO_LOG_VIEWER_HOST'):
INSTALLED_APPS += ['cvat.apps.log_viewer']
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
@ -213,6 +217,11 @@ LOGGING = {
'cvat.client': {
'handlers': [],
'level': os.getenv('DJANGO_LOG_LEVEL', 'DEBUG'),
},
'revproxy': {
'handlers': ['console', 'server_file'],
'level': os.getenv('DJANGO_LOG_LEVEL', 'DEBUG')
}
},
}

@ -30,7 +30,8 @@ urlpatterns = [
path('dashboard/', include('cvat.apps.dashboard.urls')),
path('django-rq/', include('django_rq.urls')),
path('auth/', include('cvat.apps.authentication.urls')),
path('documentation/', include('cvat.apps.documentation.urls'))
path('documentation/', include('cvat.apps.documentation.urls')),
path('analytics/', include('cvat.apps.log_viewer.urls'))
]
if 'yes' == os.environ.get('TF_ANNOTATION', 'no'):

@ -50,8 +50,6 @@ services:
WITH_TESTS: "no"
environment:
DJANGO_MODWSGI_EXTRA_ARGS: ""
DJANGO_LOG_SERVER_HOST: ""
DJANGO_LOG_SERVER_PORT: ""
volumes:
- cvat_data:/home/django/data
- cvat_keys:/home/django/keys

Loading…
Cancel
Save