Logstash persistent queue improvement (#2744)

Co-authored-by: Andrey Zhavoronkov <andrey.zhavoronkov@intel.com>
main
PMazarovich 5 years ago committed by GitHub
parent e43707d779
commit 7720a8f6c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -25,6 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Logstash is improved for using with configurable elasticsearch outputs (<https://github.com/openvinotoolkit/cvat/pull/2531>) - Logstash is improved for using with configurable elasticsearch outputs (<https://github.com/openvinotoolkit/cvat/pull/2531>)
- Bumped nuclio version to 1.5.16 - Bumped nuclio version to 1.5.16
- All methods for interative segmentation accept negative points as well - All methods for interative segmentation accept negative points as well
- Persistent queue added to logstash (<https://github.com/openvinotoolkit/cvat/pull/2744>)
### Deprecated ### Deprecated

@ -79,7 +79,7 @@ services:
cvat: cvat:
environment: environment:
DJANGO_LOG_SERVER_HOST: logstash DJANGO_LOG_SERVER_HOST: logstash
DJANGO_LOG_SERVER_PORT: 5000 DJANGO_LOG_SERVER_PORT: 8080
DJANGO_LOG_VIEWER_HOST: kibana DJANGO_LOG_VIEWER_HOST: kibana
DJANGO_LOG_VIEWER_PORT: 5601 DJANGO_LOG_VIEWER_PORT: 5601
CVAT_ANALYTICS: 1 CVAT_ANALYTICS: 1

@ -3,5 +3,6 @@ FROM docker.elastic.co/logstash/logstash-oss:${ELK_VERSION}
RUN logstash-plugin install logstash-input-http logstash-filter-aggregate \ RUN logstash-plugin install logstash-input-http logstash-filter-aggregate \
logstash-filter-prune logstash-output-email logstash-filter-prune logstash-output-email
COPY logstash.conf /usr/share/logstash/pipeline/ COPY logstash.yml /usr/share/logstash/config/
EXPOSE 5000 COPY logstash.conf /usr/share/logstash/pipeline/
EXPOSE 8080

@ -1,11 +1,22 @@
input { input {
tcp { http {
port => 5000 port => 8080
codec => json codec => json
} }
} }
filter { filter {
mutate {
add_field => {"logger_name" => ""}
add_field => {"path" =>""}
}
mutate {
copy => {"[extra][logger_name]" => "logger_name" }
copy => {"[extra][path]"=>"path"}
}
prune {
blacklist_names => ["type","logsource","extra","program","pid","headers"]
}
if [logger_name] =~ /cvat.client/ { if [logger_name] =~ /cvat.client/ {
# 1. Decode the event from json in 'message' field # 1. Decode the event from json in 'message' field
# 2. Remove unnecessary field from it # 2. Remove unnecessary field from it
@ -14,6 +25,9 @@ filter {
mutate { mutate {
rename => { "message" => "source_message" } rename => { "message" => "source_message" }
} }
mutate {
add_field => {"[@metadata][target_index_client]" => "cvat.client.%{+YYYY}.%{+MM}"}
}
json { json {
source => "source_message" source => "source_message"
@ -77,6 +91,9 @@ filter {
# 2. Remove unnecessary field from it # 2. Remove unnecessary field from it
# 3. Type it as server # 3. Type it as server
if [logger_name] =~ /cvat\.server\.task_[0-9]+/ { if [logger_name] =~ /cvat\.server\.task_[0-9]+/ {
mutate {
add_field => {"[@metadata][target_index_server]" => "cvat.server.%{+YYYY}.%{+MM}"}
}
mutate { mutate {
rename => { "logger_name" => "task_id" } rename => { "logger_name" => "task_id" }
gsub => [ "task_id", "cvat.server.task_", "" ] gsub => [ "task_id", "cvat.server.task_", "" ]

@ -0,0 +1,3 @@
queue.type: persisted
queue.max_bytes: 1gb
queue.checkpoint.writes: 20

@ -20,7 +20,7 @@ rq-scheduler==0.10.0
sqlparse==0.3.1 sqlparse==0.3.1
django-sendfile==0.3.11 django-sendfile==0.3.11
dj-pagination==2.5.0 dj-pagination==2.5.0
python-logstash==0.4.6 python-logstash-async==2.2.0
django-revproxy==0.10.0 django-revproxy==0.10.0
rules==2.2 rules==2.2
GitPython==3.1.8 GitPython==3.1.8

@ -336,7 +336,10 @@ STATIC_ROOT = os.path.join(BASE_DIR, 'static')
os.makedirs(STATIC_ROOT, exist_ok=True) os.makedirs(STATIC_ROOT, exist_ok=True)
DATA_ROOT = os.path.join(BASE_DIR, 'data') DATA_ROOT = os.path.join(BASE_DIR, 'data')
LOGSTASH_DB = os.path.join(DATA_ROOT,'logstash.db')
os.makedirs(DATA_ROOT, exist_ok=True) os.makedirs(DATA_ROOT, exist_ok=True)
if not os.path.exists(LOGSTASH_DB):
os.mknod(LOGSTASH_DB)
MEDIA_DATA_ROOT = os.path.join(DATA_ROOT, 'data') MEDIA_DATA_ROOT = os.path.join(DATA_ROOT, 'data')
os.makedirs(MEDIA_DATA_ROOT, exist_ok=True) os.makedirs(MEDIA_DATA_ROOT, exist_ok=True)
@ -366,6 +369,11 @@ LOGGING = {
'version': 1, 'version': 1,
'disable_existing_loggers': False, 'disable_existing_loggers': False,
'formatters': { 'formatters': {
'logstash': {
'()': 'logstash_async.formatter.DjangoLogstashFormatter',
'message_type': 'python-logstash',
'fqdn': False, # Fully qualified domain name. Default value: false.
},
'standard': { 'standard': {
'format': '[%(asctime)s] %(levelname)s %(name)s: %(message)s' 'format': '[%(asctime)s] %(levelname)s %(name)s: %(message)s'
} }
@ -386,11 +394,16 @@ LOGGING = {
}, },
'logstash': { 'logstash': {
'level': 'INFO', 'level': 'INFO',
'class': 'logstash.TCPLogstashHandler', 'class': 'logstash_async.handler.AsynchronousLogstashHandler',
'formatter': 'logstash',
'transport': 'logstash_async.transport.HttpTransport',
'ssl_enable': False,
'ssl_verify': False,
'host': os.getenv('DJANGO_LOG_SERVER_HOST', 'localhost'), 'host': os.getenv('DJANGO_LOG_SERVER_HOST', 'localhost'),
'port': os.getenv('DJANGO_LOG_SERVER_PORT', 5000), 'port': os.getenv('DJANGO_LOG_SERVER_PORT', 8080),
'version': 1, 'version': 1,
'message_type': 'django', 'message_type': 'django',
'database_path': LOGSTASH_DB,
} }
}, },
'loggers': { 'loggers': {

Loading…
Cancel
Save