Home >Operation and Maintenance >Nginx >How to import nginx logs into elasticsearch

How to import nginx logs into elasticsearch

王林
王林forward
2023-05-13 12:52:11772browse

Collect nginx logs through filebeat and transfer them to logstash. After processing by logstash, they are written to elasticsearch. Filebeat is only responsible for collection work, while logstash completes log formatting, data replacement, splitting, and index creation after writing logs to elasticsearch.

1. Configure nginx log format

log_format main    '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request ' 
            '$status $body_bytes_sent $http_referer ' 
            '"$http_user_agent" '
            '"$connection" '
            '"$http_cookie" '
            '$request_time '
            '$upstream_response_time';

2. Install and configure filebeat, enable nginx module

tar -zxvf filebeat-6.2.4-linux-x86_64.tar.gz -c /usr/local
cd /usr/local;ln -s filebeat-6.2.4-linux-x86_64 filebeat
cd /usr/local/filebeat

Enable nginx module

./filebeat modules enable nginx

View module

./filebeat modules list

Create configuration file

vim /usr/local/filebeat/blog_module_logstash.yml
filebeat.modules:
- module: nginx
 access:
  enabled: true
  var.paths: ["/home/weblog/blog.cnfol.com_access.log"]
 #error:
 # enabled: true
 # var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"]


output.logstash:
 hosts: ["192.168.15.91:5044"]

Start filebeat

./filebeat -c blog_module_logstash.yml -e

3. Configure logstash

tar -zxvf logstash-6.2.4.tar.gz /usr/local
cd /usr/local;ln -s logstash-6.2.4 logstash
创建一个nginx日志的pipline文件
cd /usr/local/logstash

logstash built-in template directory

vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns

Edit grok-patterns to add a regular pattern that supports multiple IPs

forword (?:%{ipv4}[,]?[ ]?)+|%{word}

Official grok

#Create logstash pipline Configuration file

#input {
# stdin {}
#}
# 从filebeat接受数据
input {
 beats {
 port => 5044
 host => "0.0.0.0"
 }
}

filter {
 # 添加一个调试的开关
 mutate{add_field => {"[@metadata][debug]"=>true}}
 grok {
 # 过滤nginx日志
 #match => { "message" => "%{nginxaccess_test2}" }
 #match => { "message" => &#39;%{iporhost:clientip} # (?<http_x_forwarded_for>[^\#]*) # \[%{httpdate:[@metadata][webtime]}\] # %{notspace:hostname} # %{word:verb} %{uripathparam:request} http/%{number:httpversion} # %{number:response} # (?:%{number:bytes}|-) # (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) # (?:"(?<http_user_agent>[^#]*)") # (?:"(?:%{number:connection}|-)"|%{number:connection}|-) # (?:"(?<cookies>[^#]*)") # %{number:request_time:float} # (?:%{number:upstream_response_time:float}|-)&#39; }
 #match => { "message" => &#39;(?:%{iporhost:clientip}|-) (?:%{two_ip:http_x_forwarded_for}|%{ipv4:http_x_forwarded_for}|-) \[%{httpdate:[@metadata][webtime]}\] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (?:"(?:%{number:connection}|-)"|%{number:connection}|-) (?:"(?<cookies>[^#]*)") %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)&#39; }
    match => { "message" => &#39;(?:%{iporhost:clientip}|-) %{forword:http_x_forwarded_for} \[%{httpdate:[@metadata][webtime]}\] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (?:"(?:%{number:connection}|-)"|%{number:connection}|-) %{qs:cookie} %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)&#39; }
 }
 # 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp
 ruby { 
 #code => "event.set(&#39;@read_timestamp&#39;,event.get(&#39;@timestamp&#39;))"
 #将时区改为东8区
 code => "event.set(&#39;@read_timestamp&#39;,event.get(&#39;@timestamp&#39;).time.localtime + 8*60*60)"
 }
 # 将nginx的日志记录时间格式化
 # 格式化时间 20/may/2015:21:05:56 +0000
 date {
 locale => "en"
 match => ["[@metadata][webtime]","dd/mmm/yyyy:hh:mm:ss z"]
 }
 # 将bytes字段由字符串转换为数字
 mutate {
 convert => {"bytes" => "integer"}
 }
 # 将cookie字段解析成一个json
 #mutate {
 # gsub => ["cookies",&#39;\;&#39;,&#39;,&#39;]
 #} 
 # 如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip
 if[http_x_forwarded_for] =~ ", "{
     ruby {
         code => &#39;event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])&#39;
        }
    }
 # 解析ip,获得ip的地理位置
 geoip {
 source => "http_x_forwarded_for"
 # # 只获取ip的经纬度、国家、城市、时区
 fields => ["location","country_name","city_name","region_name"] 
 }
 # 将agent字段解析,获得浏览器、系统版本等具体信息
 useragent {
 source => "agent"
 target => "useragent"
 }
 #指定要删除的数据
 #mutate{remove_field=>["message"]}
 # 根据日志名设置索引名的前缀
 ruby {
 code => &#39;event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])&#39;
 } 
 # 将@timestamp 格式化为2019.04.23
 ruby {
 code => &#39;event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%y.%m.%d"))&#39;
 }
 # 设置输出的默认索引名
 mutate {
 add_field => {
  #"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+yyyy.mm.dd}"
  "[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}"
 }
 }
 # 将cookies字段解析成json
# mutate {
# gsub => [
#  "cookies", ";", ",",
#  "cookies", "=", ":"
# ]
# #split => {"cookies" => ","}
# }
# json_encode {
# source => "cookies"
# target => "cookies_json"
# }
# mutate {
# gsub => [
#  "cookies_json", &#39;,&#39;, &#39;","&#39;,
#  "cookies_json", &#39;:&#39;, &#39;":"&#39;
# ]
# }
# json {
# source => "cookies_json"
# target => "cookies2"
# }
 # 如果grok解析存在错误,将错误独立写入一个索引
 if "_grokparsefailure" in [tags] {
 #if "_dateparsefailure" in [tags] {
 mutate {
  replace => {
  #"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+yyyy.mm.dd}"
  "[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"
  }
 }
 # 如果不存在错误就删除message
 }else{
 mutate{remove_field=>["message"]}
 }
}

output {
 if [@metadata][debug]{
 # 输出到rubydebuyg并输出metadata
 stdout{codec => rubydebug{metadata => true}}
 }else{
 # 将输出内容转换成 "."
 stdout{codec => dots} 
 # 将输出到指定的es
 elasticsearch {
  hosts => ["192.168.15.160:9200"]
  index => "%{[@metadata][index]}"
  document_type => "doc"
 } 
 }
}

Start logstash

nohup bin/logstash -f test_pipline2.conf &

The above is the detailed content of How to import nginx logs into elasticsearch. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete