Maison  >  Article  >  Opération et maintenance  >  Comment importer les journaux nginx dans elasticsearch

Comment importer les journaux nginx dans elasticsearch

王林
王林avant
2023-05-13 12:52:11712parcourir

Collectez les journaux nginx via filebeat et transférez-les vers logstash. Après traitement par logstash, ils sont écrits dans elasticsearch. Filebeat est uniquement responsable du travail de collecte, tandis que logstash effectue le formatage des journaux, le remplacement des données, le fractionnement et la création d'index après avoir écrit les journaux dans elasticsearch.

1. Configurez le format de journal nginx

log_format main    '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request ' 
            '$status $body_bytes_sent $http_referer ' 
            '"$http_user_agent" '
            '"$connection" '
            '"$http_cookie" '
            '$request_time '
            '$upstream_response_time';

2. Installez et configurez filebeat, activez le module nginx

tar -zxvf filebeat-6.2.4-linux-x86_64.tar.gz -c /usr/local
cd /usr/local;ln -s filebeat-6.2.4-linux-x86_64 filebeat
cd /usr/local/filebeat

Activez le module nginx

./filebeat modules enable nginx

Affichez le module

./filebeat modules list

Créez un fichier de configuration

vim /usr/local/filebeat/blog_module_logstash.yml
filebeat.modules:
- module: nginx
 access:
  enabled: true
  var.paths: ["/home/weblog/blog.cnfol.com_access.log"]
 #error:
 # enabled: true
 # var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"]


output.logstash:
 hosts: ["192.168.15.91:5044"]

Démarrez le fichier. battre

./filebeat -c blog_module_logstash.yml -e

3. Configurez logstash

tar -zxvf logstash-6.2.4.tar.gz /usr/local
cd /usr/local;ln -s logstash-6.2.4 logstash
创建一个nginx日志的pipline文件
cd /usr/local/logstash

répertoire de modèles intégré de logstash

vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns

éditez les modèles grok et ajoutez un modèle régulier qui prend en charge plusieurs IP

forword (?:%{ipv4}[,]?[ ]?)+|%{word}

grok officiel

#

créer un fichier de configuration de pipeline logstash

#input {
# stdin {}
#}
# 从filebeat接受数据
input {
 beats {
 port => 5044
 host => "0.0.0.0"
 }
}

filter {
 # 添加一个调试的开关
 mutate{add_field => {"[@metadata][debug]"=>true}}
 grok {
 # 过滤nginx日志
 #match => { "message" => "%{nginxaccess_test2}" }
 #match => { "message" => &#39;%{iporhost:clientip} # (?<http_x_forwarded_for>[^\#]*) # \[%{httpdate:[@metadata][webtime]}\] # %{notspace:hostname} # %{word:verb} %{uripathparam:request} http/%{number:httpversion} # %{number:response} # (?:%{number:bytes}|-) # (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) # (?:"(?<http_user_agent>[^#]*)") # (?:"(?:%{number:connection}|-)"|%{number:connection}|-) # (?:"(?<cookies>[^#]*)") # %{number:request_time:float} # (?:%{number:upstream_response_time:float}|-)&#39; }
 #match => { "message" => &#39;(?:%{iporhost:clientip}|-) (?:%{two_ip:http_x_forwarded_for}|%{ipv4:http_x_forwarded_for}|-) \[%{httpdate:[@metadata][webtime]}\] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (?:"(?:%{number:connection}|-)"|%{number:connection}|-) (?:"(?<cookies>[^#]*)") %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)&#39; }
    match => { "message" => &#39;(?:%{iporhost:clientip}|-) %{forword:http_x_forwarded_for} \[%{httpdate:[@metadata][webtime]}\] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (?:"(?:%{number:connection}|-)"|%{number:connection}|-) %{qs:cookie} %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)&#39; }
 }
 # 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp
 ruby { 
 #code => "event.set(&#39;@read_timestamp&#39;,event.get(&#39;@timestamp&#39;))"
 #将时区改为东8区
 code => "event.set(&#39;@read_timestamp&#39;,event.get(&#39;@timestamp&#39;).time.localtime + 8*60*60)"
 }
 # 将nginx的日志记录时间格式化
 # 格式化时间 20/may/2015:21:05:56 +0000
 date {
 locale => "en"
 match => ["[@metadata][webtime]","dd/mmm/yyyy:hh:mm:ss z"]
 }
 # 将bytes字段由字符串转换为数字
 mutate {
 convert => {"bytes" => "integer"}
 }
 # 将cookie字段解析成一个json
 #mutate {
 # gsub => ["cookies",&#39;\;&#39;,&#39;,&#39;]
 #} 
 # 如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip
 if[http_x_forwarded_for] =~ ", "{
     ruby {
         code => &#39;event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])&#39;
        }
    }
 # 解析ip,获得ip的地理位置
 geoip {
 source => "http_x_forwarded_for"
 # # 只获取ip的经纬度、国家、城市、时区
 fields => ["location","country_name","city_name","region_name"] 
 }
 # 将agent字段解析,获得浏览器、系统版本等具体信息
 useragent {
 source => "agent"
 target => "useragent"
 }
 #指定要删除的数据
 #mutate{remove_field=>["message"]}
 # 根据日志名设置索引名的前缀
 ruby {
 code => &#39;event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])&#39;
 } 
 # 将@timestamp 格式化为2019.04.23
 ruby {
 code => &#39;event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%y.%m.%d"))&#39;
 }
 # 设置输出的默认索引名
 mutate {
 add_field => {
  #"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+yyyy.mm.dd}"
  "[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}"
 }
 }
 # 将cookies字段解析成json
# mutate {
# gsub => [
#  "cookies", ";", ",",
#  "cookies", "=", ":"
# ]
# #split => {"cookies" => ","}
# }
# json_encode {
# source => "cookies"
# target => "cookies_json"
# }
# mutate {
# gsub => [
#  "cookies_json", &#39;,&#39;, &#39;","&#39;,
#  "cookies_json", &#39;:&#39;, &#39;":"&#39;
# ]
# }
# json {
# source => "cookies_json"
# target => "cookies2"
# }
 # 如果grok解析存在错误,将错误独立写入一个索引
 if "_grokparsefailure" in [tags] {
 #if "_dateparsefailure" in [tags] {
 mutate {
  replace => {
  #"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+yyyy.mm.dd}"
  "[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"
  }
 }
 # 如果不存在错误就删除message
 }else{
 mutate{remove_field=>["message"]}
 }
}

output {
 if [@metadata][debug]{
 # 输出到rubydebuyg并输出metadata
 stdout{codec => rubydebug{metadata => true}}
 }else{
 # 将输出内容转换成 "."
 stdout{codec => dots} 
 # 将输出到指定的es
 elasticsearch {
  hosts => ["192.168.15.160:9200"]
  index => "%{[@metadata][index]}"
  document_type => "doc"
 } 
 }
}

start logstash

nohup bin/logstash -f test_pipline2.conf &

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer