將nginx日誌經過filebeat收集後傳入logstash,經過logstash處理後寫入elasticsearch。 filebeat只負責收集工作,logstash完成日誌的格式化,資料的替換,拆分 ,以及將日誌寫入elasticsearch後的索引的建立。
1、設定nginx日誌格式
log_format main '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request ' '$status $body_bytes_sent $http_referer ' '"$http_user_agent" ' '"$connection" ' '"$http_cookie" ' '$request_time ' '$upstream_response_time';
#2、安裝設定filebeat,啟用nginx module
tar -zxvf filebeat-6.2.4-linux-x86_64.tar.gz -c /usr/local cd /usr/local;ln -s filebeat-6.2.4-linux-x86_64 filebeat cd /usr/local/filebeat
啟用nginx模組
./filebeat modules enable nginx
查看模組
./filebeat modules list
建立設定檔
vim /usr/local/filebeat/blog_module_logstash.yml filebeat.modules: - module: nginx access: enabled: true var.paths: ["/home/weblog/blog.cnfol.com_access.log"] #error: # enabled: true # var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"] output.logstash: hosts: ["192.168.15.91:5044"]
#啟動filebeat
./filebeat -c blog_module_logstash.yml -e
3、設定logstash
#tar -zxvf logstash-6.2.4.tar.gz /usr/local cd /usr/local;ln -s logstash-6.2.4 logstash 创建一个nginx日志的pipline文件 cd /usr/local/logstash
logstash內建的範本目錄
vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns
編輯grok-patterns 新增一個支援多ip的正規
forword (?:%{ipv4}[,]?[ ]?)+|%{word}
官方grok
#建立logstash pipline設定檔
#input { # stdin {} #} # 从filebeat接受数据 input { beats { port => 5044 host => "0.0.0.0" } } filter { # 添加一个调试的开关 mutate{add_field => {"[@metadata][debug]"=>true}} grok { # 过滤nginx日志 #match => { "message" => "%{nginxaccess_test2}" } #match => { "message" => '%{iporhost:clientip} # (?<http_x_forwarded_for>[^\#]*) # \[%{httpdate:[@metadata][webtime]}\] # %{notspace:hostname} # %{word:verb} %{uripathparam:request} http/%{number:httpversion} # %{number:response} # (?:%{number:bytes}|-) # (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) # (?:"(?<http_user_agent>[^#]*)") # (?:"(?:%{number:connection}|-)"|%{number:connection}|-) # (?:"(?<cookies>[^#]*)") # %{number:request_time:float} # (?:%{number:upstream_response_time:float}|-)' } #match => { "message" => '(?:%{iporhost:clientip}|-) (?:%{two_ip:http_x_forwarded_for}|%{ipv4:http_x_forwarded_for}|-) \[%{httpdate:[@metadata][webtime]}\] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (?:"(?:%{number:connection}|-)"|%{number:connection}|-) (?:"(?<cookies>[^#]*)") %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)' } match => { "message" => '(?:%{iporhost:clientip}|-) %{forword:http_x_forwarded_for} \[%{httpdate:[@metadata][webtime]}\] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (?:"(?:%{number:connection}|-)"|%{number:connection}|-) %{qs:cookie} %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)' } } # 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp ruby { #code => "event.set('@read_timestamp',event.get('@timestamp'))" #将时区改为东8区 code => "event.set('@read_timestamp',event.get('@timestamp').time.localtime + 8*60*60)" } # 将nginx的日志记录时间格式化 # 格式化时间 20/may/2015:21:05:56 +0000 date { locale => "en" match => ["[@metadata][webtime]","dd/mmm/yyyy:hh:mm:ss z"] } # 将bytes字段由字符串转换为数字 mutate { convert => {"bytes" => "integer"} } # 将cookie字段解析成一个json #mutate { # gsub => ["cookies",'\;',','] #} # 如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip if[http_x_forwarded_for] =~ ", "{ ruby { code => 'event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])' } } # 解析ip,获得ip的地理位置 geoip { source => "http_x_forwarded_for" # # 只获取ip的经纬度、国家、城市、时区 fields => ["location","country_name","city_name","region_name"] } # 将agent字段解析,获得浏览器、系统版本等具体信息 useragent { source => "agent" target => "useragent" } #指定要删除的数据 #mutate{remove_field=>["message"]} # 根据日志名设置索引名的前缀 ruby { code => 'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])' } # 将@timestamp 格式化为2019.04.23 ruby { code => 'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%y.%m.%d"))' } # 设置输出的默认索引名 mutate { add_field => { #"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+yyyy.mm.dd}" "[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}" } } # 将cookies字段解析成json # mutate { # gsub => [ # "cookies", ";", ",", # "cookies", "=", ":" # ] # #split => {"cookies" => ","} # } # json_encode { # source => "cookies" # target => "cookies_json" # } # mutate { # gsub => [ # "cookies_json", ',', '","', # "cookies_json", ':', '":"' # ] # } # json { # source => "cookies_json" # target => "cookies2" # } # 如果grok解析存在错误,将错误独立写入一个索引 if "_grokparsefailure" in [tags] { #if "_dateparsefailure" in [tags] { mutate { replace => { #"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+yyyy.mm.dd}" "[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}" } } # 如果不存在错误就删除message }else{ mutate{remove_field=>["message"]} } } output { if [@metadata][debug]{ # 输出到rubydebuyg并输出metadata stdout{codec => rubydebug{metadata => true}} }else{ # 将输出内容转换成 "." stdout{codec => dots} # 将输出到指定的es elasticsearch { hosts => ["192.168.15.160:9200"] index => "%{[@metadata][index]}" document_type => "doc" } } }
啟動logstash
nohup bin/logstash -f test_pipline2.conf &
以上是怎麼將nginx日誌匯入elasticsearch的詳細內容。更多資訊請關注PHP中文網其他相關文章!