使用flume+kafka+storm构建实时日志分析系统
本文只会涉及flume和kafka的结合,kafka和storm的结合可以参考其他博客1. flume安装使用
下载flume安装包http://www.apache.org/dyn/closer.cgi/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz
解压$ tar -xzvf apache-flume-1.5.2-bin.tar.gz -C /opt/flume
flume配置文件放在conf文件目录下,执行文件放在bin文件目录下。
1)配置flume
进入conf目录将flume-conf.properties.template拷贝一份,并命名为自己需要的名字
$ cp flume-conf.properties.template flume.conf
修改flume.conf的内容,我们使用file sink来接收channel中的数据,channel采用memory channel,source采用exec source,配置文件如下:
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>agent.sources = seqGenSrc</li><li>agent.channels = memoryChannel</li><li>agent.sinks = loggerSink</li><li></li><li># For each one of the sources, the type is defined</li><li>agent.sources.seqGenSrc.type = exec</li><li>agent.sources.seqGenSrc.command = tail -F /data/mongodata/mongo.log</li><li>#agent.sources.seqGenSrc.bind = 172.168.49.130</li><li></li><li># The channel can be defined as follows.</li><li>agent.sources.seqGenSrc.channels = memoryChannel</li><li></li><li># Each sink's type must be defined</li><li>agent.sinks.loggerSink.type = file_roll</li><li>agent.sinks.loggerSink.sink.directory = /data/flume</li><li></li><li>#Specify the channel the sink should use</li><li>agent.sinks.loggerSink.channel = memoryChannel</li><li></li><li># Each channel's type is defined.</li><li>agent.channels.memoryChannel.type = memory</li><li></li><li># Other config values specific to each type of channel(sink or source)</li><li># can be defined as well</li><li># In this case, it specifies the capacity of the memory channel</li><li>agent.channels.memoryChannel.capacity = 1000</li><li>agent.channels.memory4log.transactionCapacity = 100</li></ol>2)运行flume agent
切换到bin目录下,运行一下命令:
$ ./flume-ng agent --conf ../conf -f ../conf/flume.conf --n agent -Dflume.root.logger=INFO,console
在/data/flume目录下可以看到生成的日志文件。
2. 结合kafka
由于flume1.5.2没有kafka sink,所以需要自己开发kafka sink
可以参考flume 1.6里面的kafka sink,但是要注意使用的kafka版本,由于有些kafka api不兼容的
这里只提供核心代码,process()内容。
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>Sink.Status status = Status.READY;<br /> </li><li><br /></li><li>Channel ch = getChannel();<br /></li><li>Transaction transaction = null;<br /></li><li>Event event = null;<br /></li><li>String eventTopic = null;<br /></li><li>String eventKey = null;<br /></li><li><br /></li><li>try {<br /></li><li>transaction = ch.getTransaction();<br /></li><li>transaction.begin();<br /></li><li>messageList.clear();<br /></li><li><br /></li><li>if (type.equals("sync")) {<br /></li><li>event = ch.take();<br /></li><li><br /></li><li> if (event != null) {<br /></li><li> byte[] tempBody = event.getBody();<br /></li><li> String eventBody = new String(tempBody,"UTF-8");<br /></li><li> Map<String, String> headers = event.getHeaders();<br /></li><li><br /></li><li> if ((eventTopic = headers.get(TOPIC_HDR)) == null) {<br /></li><li> eventTopic = topic;<br /></li><li> }<br /></li><li><br /></li><li> eventKey = headers.get(KEY_HDR);<br /></li><li><br /></li><li> if (logger.isDebugEnabled()) {<br /></li><li> logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "<br /></li><li> + eventBody);<br /></li><li> }<br /></li><li> <br /></li><li> ProducerData<String, Message> data = new ProducerData<String, Message><br /></li><li> (eventTopic, new Message(tempBody));<br /></li><li> <br /></li><li> long startTime = System.nanoTime();<br /></li><li> logger.debug(eventTopic+"++++"+eventBody);<br /></li><li> producer.send(data);<br /></li><li> long endTime = System.nanoTime(); </li><li> }<br /></li><li>} else {<br /></li><li>long processedEvents = 0;<br /></li><li>for (; processedEvents < batchSize; processedEvents += 1) {<br /></li><li>event = ch.take();<br /></li><li><br /></li><li> if (event == null) {<br /></li><li> break;<br /></li><li> }<br /></li><li><br /></li><li> byte[] tempBody = event.getBody();<br /></li><li> String eventBody = new String(tempBody,"UTF-8");<br /></li><li> Map<String, String> headers = event.getHeaders();<br /></li><li><br /></li><li> if ((eventTopic = headers.get(TOPIC_HDR)) == null) {<br /></li><li> eventTopic = topic;<br /></li><li> }<br /></li><li><br /></li><li> eventKey = headers.get(KEY_HDR);<br /></li><li><br /></li><li> if (logger.isDebugEnabled()) {<br /></li><li> logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "<br /></li><li> + eventBody);<br /></li><li> logger.debug("event #{}", processedEvents);<br /></li><li> }<br /></li><li><br /></li><li> // create a message and add to buffer<br /></li><li> ProducerData<String, String> data = new ProducerData<String, String><br /></li><li> (eventTopic, eventBody);<br /></li><li> messageList.add(data);<br /></li><li>}<br /></li><li><br /></li><li>// publish batch and commit.<br /></li><li> if (processedEvents > 0) {<br /></li><li> long startTime = System.nanoTime(); </li><li> long endTime = System.nanoTime(); </li><li> }<br /></li><li>}<br /></li><li><br /></li><li>transaction.commit();<br /></li><li>} catch (Exception ex) {<br /></li><li>String errorMsg = "Failed to publish events";<br /></li><li>logger.error("Failed to publish events", ex);<br /></li><li>status = Status.BACKOFF;<br /></li><li>if (transaction != null) {<br /></li><li>try {<br /></li><li>transaction.rollback(); </li><li>} catch (Exception e) {<br /></li><li>logger.error("Transaction rollback failed", e);<br /></li><li>throw Throwables.propagate(e);<br /></li><li>}<br /></li><li>}<br /></li><li>throw new EventDeliveryException(errorMsg, ex);<br /></li><li>} finally {<br /></li><li>if (transaction != null) {<br /></li><li>transaction.close();<br /></li><li>}<br /></li><li>}<br /></li><li><br /></li><li>return status; </li></ol>下一步,修改flume配置文件,将其中sink部分的配置改成kafka sink,如:
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink<br /> </li><li>producer.sinks.r.brokerList = bigdata-node00:9092<br /></li><li>producer.sinks.r.requiredAcks = 1<br /></li><li>producer.sinks.r.batchSize = 100<br /></li><li>#producer.sinks.r.kafka.producer.type=async<br /></li><li>#producer.sinks.r.kafka.customer.encoding=UTF-8<br /></li><li>producer.sinks.r.topic = testFlume1</li></ol>type指向kafkasink所在的完整路径
下面的参数都是kafka的一系列参数,最重要的是brokerList和topic参数
现在重新启动flume,就可以在kafka的对应topic下查看到对应的日志

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

MantisBT
Mantis is an easy-to-deploy web-based defect tracking tool designed to aid in product defect tracking. It requires PHP, MySQL and a web server. Check out our demo and hosting services.

MinGW - Minimalist GNU for Windows
This project is in the process of being migrated to osdn.net/projects/mingw, you can continue to follow us there. MinGW: A native Windows port of the GNU Compiler Collection (GCC), freely distributable import libraries and header files for building native Windows applications; includes extensions to the MSVC runtime to support C99 functionality. All MinGW software can run on 64-bit Windows platforms.

SublimeText3 English version
Recommended: Win version, supports code prompts!

PhpStorm Mac version
The latest (2018.2.1) professional PHP integrated development tool

EditPlus Chinese cracked version
Small size, syntax highlighting, does not support code prompt function
