Modifed the filebeat NGINX module, as the access log was extended to include upstream metrics.
log_format log_requests '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" ' '"$host" $SSL_protocol "$SSL_cipher" "$upstream_addr" ' 'rt=$request_time uct=$upstream_connect_time ' 'uht=$upstream_header_time urt=$upstream_response_time';
This is basically the combined log format with the additional metrics of
- host – server block name, ie www.thesmithcave.nz
- $SSL_protocol – TLS protocol, ie TLSv1.2
- $SSL_cipher – Cipher used for the SSL connection
- $upstream_addr – backend server IP Address
- $rt, uct,uht and urt as time values
To be able to use this with Elastics filebeat module requires a few changes to the ingest pipeline. The changes are as follows:
- Add a replacement to replace rt=-, uct=-, uht=- or urt=- to be =0 instead of =-
- Added additional GROK pattern to parse the above
NOTE: If the module is updated, then the ingest pipeline will need to be modified before ingestion occurs
The full file of /usr/share/filebeat/module/nginx/access/ingest/pipeline.yml:
description: Pipeline for parsing Nginx access logs. Requires the geoip and user_agent plugins. processors: - set: field: event.ingested value: '{{_ingest.timestamp}}' - gsub: field: message pattern: "urt=-" replacement: "urt=0" - gsub: field: message pattern: "uht=-" replacement: "uht=0" - gsub: field: message pattern: "uct=-" replacement: "uct=0" - gsub: field: message pattern: "rt=-" replacement: "rt=0" - grok: field: message patterns: - (%{NGINX_HOST} )?"?(?:%{NGINX_ADDRESS_LIST:nginx.access.remote_ip_list}|%{NOTSPACE:source.address}) - (-|%{DATA:user.name}) \[%{HTTPDATE:nginx.access.time}\] "%{DATA:nginx.access.info}" %{NUMBER:http.response.status_code:long} %{NUMBER:http.response.body.bytes:long} "(-|%{DATA:http.request.referrer})" "(-|%{DATA:user_agent.original})" "%{DATA:url.host}" %{DATA:tls.protocol} "%{DATA:tls.cipher}" "%{DATA:nginx.upstream.backend}" rt=%{NUMBER:nginx.request.time:float} uct=%{NUMBER:nginx.upstream.connect.time:float} uht=%{NUMBER:nginx.upstream.header.time:float} urt=%{NUMBER:nginx.upstream.response.time:float} - (%{NGINX_HOST} )?"?(?:%{NGINX_ADDRESS_LIST:nginx.access.remote_ip_list}|%{NOTSPACE:source.address}) - (-|%{DATA:user.name}) \[%{HTTPDATE:nginx.access.time}\] "%{DATA:nginx.access.info}" %{NUMBER:http.response.status_code:long} %{NUMBER:http.response.body.bytes:long} "(-|%{DATA:http.request.referrer})" "(-|%{DATA:user_agent.original})" pattern_definitions: NGINX_HOST: (?:%{IP:destination.ip}|%{NGINX_NOTSEPARATOR:destination.domain})(:%{NUMBER:destination.port})? NGINX_NOTSEPARATOR: "[^\t ,:]+" NGINX_ADDRESS_LIST: (?:%{IP}|%{WORD})("?,?\s*(?:%{IP}|%{WORD}))* ignore_missing: true - grok: field: nginx.access.info patterns: - '%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}' - "" ignore_missing: true - remove: field: nginx.access.info - split: field: nginx.access.remote_ip_list separator: '"?,?\s+' ignore_missing: true - split: field: nginx.access.origin separator: '"?,?\s+' ignore_missing: true - set: field: source.address if: ctx.source?.address == null value: "" - script: if: ctx.nginx?.access?.remote_ip_list != null && ctx.nginx.access.remote_ip_list.length > 0 lang: painless source: >- boolean isPrivate(def dot, def ip) { try { StringTokenizer tok = new StringTokenizer(ip, dot); int firstByte = Integer.parseInt(tok.nextToken()); int secondByte = Integer.parseInt(tok.nextToken()); if (firstByte == 10) { return true; } if (firstByte == 192 && secondByte == 168) { return true; } if (firstByte == 172 && secondByte >= 16 && secondByte <= 31) { return true; } if (firstByte == 127) { return true; } return false; } catch (Exception e) { return false; } } try { ctx.source.address = null; if (ctx.nginx.access.remote_ip_list == null) { return; } def found = false; for (def item : ctx.nginx.access.remote_ip_list) { if (!isPrivate(params.dot, item)) { ctx.source.address = item; found = true; break; } } if (!found) { ctx.source.address = ctx.nginx.access.remote_ip_list[0]; } } catch (Exception e) { ctx.source.address = null; } params: dot: . - remove: field: source.address if: ctx.source.address == null - grok: field: source.address patterns: - ^%{IP:source.ip}$ ignore_failure: true - remove: field: message - rename: field: '@timestamp' target_field: event.created - date: field: nginx.access.time target_field: '@timestamp' formats: - dd/MMM/yyyy:H:m:s Z on_failure: - append: field: error.message value: '{{ _ingest.on_failure_message }}' - remove: field: nginx.access.time - user_agent: field: user_agent.original ignore_missing: true - geoip: field: source.ip target_field: source.geo ignore_missing: true - geoip: database_file: GeoLite2-ASN.mmdb field: source.ip target_field: source.as properties: - asn - organization_name ignore_missing: true - rename: field: source.as.asn target_field: source.as.number ignore_missing: true - rename: field: source.as.organization_name target_field: source.as.organization.name ignore_missing: true - set: field: event.kind value: event - append: field: event.category value: web - append: field: event.type value: access - set: field: event.outcome value: success if: "ctx?.http?.response?.status_code != null && ctx.http.response.status_code < 400" - set: field: event.outcome value: failure if: "ctx?.http?.response?.status_code != null && ctx.http.response.status_code >= 400" - append: field: related.ip value: "{{source.ip}}" if: "ctx?.source?.ip != null" - append: field: related.ip value: "{{destination.ip}}" if: "ctx?.destination?.ip != null" - append: field: related.user value: "{{user.name}}" if: "ctx?.user?.name != null" on_failure: - set: field: error.message value: '{{ _ingest.on_failure_message }}'