input { http { port => 8080 codec => json } } filter { mutate { add_field => {"logger_name" => ""} add_field => {"path" =>""} } mutate { copy => {"[extra][logger_name]" => "logger_name" } copy => {"[extra][path]"=>"path"} } prune { blacklist_names => ["type","logsource","extra","program","pid","headers"] } if [logger_name] =~ /cvat.client/ { # 1. Decode the event from json in 'message' field # 2. Remove unnecessary field from it # 3. Type it as client mutate { rename => { "message" => "source_message" } } mutate { add_field => {"[@metadata][target_index_client]" => "cvat.client.%{+YYYY}.%{+MM}"} } json { source => "source_message" } date { match => ["time", "ISO8601"] remove_field => "time" } if [payload] { ruby { code => " event.get('payload').each { |key, value| event.set(key, value) } " } } if [name] == "Send exception" { aggregate { task_id => "%{username}_%{message}_%{filename}_%{line}" code => " require 'time' map['username'] ||= event.get('username'); map['error'] ||= event.get('message'); map['filename'] ||= event.get('filename'); map['line'] ||= event.get('line'); map['task_id'] ||= event.get('task_id'); map['job_id'] ||= event.get('job_id'); map['error_count'] ||= 0; map['error_count'] += 1; map['aggregated_stack'] ||= ''; map['aggregated_stack'] += event.get('stack') + '\n\n\n'; " timeout => 3600 timeout_tags => ['aggregated_exception'] push_map_as_event_on_timeout => true } } prune { blacklist_names => ["level", "host", "logger_name", "path", "port", "stack_info", "payload", "source_message"] } mutate { replace => { "type" => "client" } rename => ["working time", "working_time"] copy => { "job_id" => "task" "username" => "userid" "name" => "event" } } } else if [logger_name] =~ /cvat.server/ { # 1. Remove 'logger_name' field and create 'task' field # 2. Remove unnecessary field from it # 3. Type it as server if [logger_name] =~ /cvat\.server\.task_[0-9]+/ { mutate { add_field => {"[@metadata][target_index_server]" => "cvat.server.%{+YYYY}.%{+MM}"} } mutate { rename => { "logger_name" => "task_id" } gsub => [ "task_id", "cvat.server.task_", "" ] } # Need to split the mutate because otherwise the conversion # doesn't work. mutate { convert => { "task_id" => "integer" } } } prune { blacklist_names => ["host", "port", "stack_info"] } mutate { replace => { "type" => "server" } } } } output { stdout { codec => rubydebug } if [type] == "client" { elasticsearch { hosts => ["${LOGSTASH_OUTPUT_HOST}"] index => "%{[@metadata][target_index_client]}" user => "${LOGSTASH_OUTPUT_USER:}" password => "${LOGSTASH_OUTPUT_PASS:}" manage_template => false } } else if [type] == "server" { elasticsearch { hosts => ["${LOGSTASH_OUTPUT_HOST}"] index => "%{[@metadata][target_index_server]}" user => "${LOGSTASH_OUTPUT_USER:}" password => "${LOGSTASH_OUTPUT_PASS:}" manage_template => false } } }