搜索
首页后端开发php教程使用flume+kafka+storm构建实时日志分析系统_PHP教程

使用flume+kafka+storm构建实时日志分析系统

本文只会涉及flume和kafka的结合,kafka和storm的结合可以参考其他博客
1. flume安装使用
下载flume安装包http://www.apache.org/dyn/closer.cgi/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz
解压$ tar -xzvf apache-flume-1.5.2-bin.tar.gz -C /opt/flume
flume配置文件放在conf文件目录下,执行文件放在bin文件目录下。
1)配置flume
进入conf目录将flume-conf.properties.template拷贝一份,并命名为自己需要的名字
$ cp flume-conf.properties.template flume.conf
修改flume.conf的内容,我们使用file sink来接收channel中的数据,channel采用memory channel,source采用exec source,配置文件如下:
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>agent.sources = seqGenSrc</li><li>agent.channels = memoryChannel</li><li>agent.sinks = loggerSink</li><li></li><li># For each one of the sources, the type is defined</li><li>agent.sources.seqGenSrc.type = exec</li><li>agent.sources.seqGenSrc.command = tail -F /data/mongodata/mongo.log</li><li>#agent.sources.seqGenSrc.bind = 172.168.49.130</li><li></li><li># The channel can be defined as follows.</li><li>agent.sources.seqGenSrc.channels = memoryChannel</li><li></li><li># Each sink's type must be defined</li><li>agent.sinks.loggerSink.type = file_roll</li><li>agent.sinks.loggerSink.sink.directory = /data/flume</li><li></li><li>#Specify the channel the sink should use</li><li>agent.sinks.loggerSink.channel = memoryChannel</li><li></li><li># Each channel's type is defined.</li><li>agent.channels.memoryChannel.type = memory</li><li></li><li># Other config values specific to each type of channel(sink or source)</li><li># can be defined as well</li><li># In this case, it specifies the capacity of the memory channel</li><li>agent.channels.memoryChannel.capacity = 1000</li><li>agent.channels.memory4log.transactionCapacity = 100</li></ol>
2)运行flume agent
切换到bin目录下,运行一下命令:
$ ./flume-ng agent --conf ../conf -f ../conf/flume.conf --n agent -Dflume.root.logger=INFO,console
在/data/flume目录下可以看到生成的日志文件。

2. 结合kafka
由于flume1.5.2没有kafka sink,所以需要自己开发kafka sink
可以参考flume 1.6里面的kafka sink,但是要注意使用的kafka版本,由于有些kafka api不兼容的
这里只提供核心代码,process()内容。

<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>Sink.Status status = Status.READY;<br /> </li><li><br /></li><li>Channel ch = getChannel();<br /></li><li>Transaction transaction = null;<br /></li><li>Event event = null;<br /></li><li>String eventTopic = null;<br /></li><li>String eventKey = null;<br /></li><li><br /></li><li>try {<br /></li><li>transaction = ch.getTransaction();<br /></li><li>transaction.begin();<br /></li><li>messageList.clear();<br /></li><li><br /></li><li>if (type.equals("sync")) {<br /></li><li>event = ch.take();<br /></li><li><br /></li><li> if (event != null) {<br /></li><li>        byte[] tempBody = event.getBody();<br /></li><li> String eventBody = new String(tempBody,"UTF-8");<br /></li><li> Map<String, String> headers = event.getHeaders();<br /></li><li><br /></li><li> if ((eventTopic = headers.get(TOPIC_HDR)) == null) {<br /></li><li>          eventTopic = topic;<br /></li><li> }<br /></li><li><br /></li><li>        eventKey = headers.get(KEY_HDR);<br /></li><li><br /></li><li> if (logger.isDebugEnabled()) {<br /></li><li> logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "<br /></li><li> + eventBody);<br /></li><li> }<br /></li><li> <br /></li><li>        ProducerData<String, Message> data = new ProducerData<String, Message><br /></li><li> (eventTopic, new Message(tempBody));<br /></li><li> <br /></li><li> long startTime = System.nanoTime();<br /></li><li> logger.debug(eventTopic+"++++"+eventBody);<br /></li><li>        producer.send(data);<br /></li><li> long endTime = System.nanoTime(); </li><li> }<br /></li><li>} else {<br /></li><li>long processedEvents = 0;<br /></li><li>for (; processedEvents < batchSize; processedEvents += 1) {<br /></li><li>event = ch.take();<br /></li><li><br /></li><li> if (event == null) {<br /></li><li> break;<br /></li><li> }<br /></li><li><br /></li><li> byte[] tempBody = event.getBody();<br /></li><li> String eventBody = new String(tempBody,"UTF-8");<br /></li><li> Map<String, String> headers = event.getHeaders();<br /></li><li><br /></li><li> if ((eventTopic = headers.get(TOPIC_HDR)) == null) {<br /></li><li>          eventTopic = topic;<br /></li><li> }<br /></li><li><br /></li><li>        eventKey = headers.get(KEY_HDR);<br /></li><li><br /></li><li> if (logger.isDebugEnabled()) {<br /></li><li> logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "<br /></li><li> + eventBody);<br /></li><li> logger.debug("event #{}", processedEvents);<br /></li><li> }<br /></li><li><br /></li><li> // create a message and add to buffer<br /></li><li>        ProducerData<String, String> data = new ProducerData<String, String><br /></li><li> (eventTopic, eventBody);<br /></li><li>        messageList.add(data);<br /></li><li>}<br /></li><li><br /></li><li>// publish batch and commit.<br /></li><li> if (processedEvents > 0) {<br /></li><li> long startTime = System.nanoTime(); </li><li> long endTime = System.nanoTime(); </li><li> }<br /></li><li>}<br /></li><li><br /></li><li>transaction.commit();<br /></li><li>} catch (Exception ex) {<br /></li><li>String errorMsg = "Failed to publish events";<br /></li><li>logger.error("Failed to publish events", ex);<br /></li><li>status = Status.BACKOFF;<br /></li><li>if (transaction != null) {<br /></li><li>try {<br /></li><li>transaction.rollback(); </li><li>} catch (Exception e) {<br /></li><li>logger.error("Transaction rollback failed", e);<br /></li><li>throw Throwables.propagate(e);<br /></li><li>}<br /></li><li>}<br /></li><li>throw new EventDeliveryException(errorMsg, ex);<br /></li><li>} finally {<br /></li><li>if (transaction != null) {<br /></li><li>transaction.close();<br /></li><li>}<br /></li><li>}<br /></li><li><br /></li><li>return status; </li></ol>
下一步,修改flume配置文件,将其中sink部分的配置改成kafka sink,如:

<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink<br /> </li><li>producer.sinks.r.brokerList = bigdata-node00:9092<br /></li><li>producer.sinks.r.requiredAcks = 1<br /></li><li>producer.sinks.r.batchSize = 100<br /></li><li>#producer.sinks.r.kafka.producer.type=async<br /></li><li>#producer.sinks.r.kafka.customer.encoding=UTF-8<br /></li><li>producer.sinks.r.topic = testFlume1</li></ol>
type指向kafkasink所在的完整路径
下面的参数都是kafka的一系列参数,最重要的是brokerList和topic参数

现在重新启动flume,就可以在kafka的对应topic下查看到对应的日志

www.bkjia.comtruehttp://www.bkjia.com/PHPjc/1109725.htmlTechArticle使用flume+kafka+storm构建实时日志分析系统 本文只会涉及flume和kafka的结合,kafka和storm的结合可以参考其他博客 1. flume安装使用 下载flume安装...
声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
如何在 iPhone 和 Android 上关闭蓝色警报如何在 iPhone 和 Android 上关闭蓝色警报Feb 29, 2024 pm 10:10 PM

根据美国司法部的解释,蓝色警报旨在提供关于可能对执法人员构成直接和紧急威胁的个人的重要信息。这种警报的目的是及时通知公众,并让他们了解与这些罪犯相关的潜在危险。通过这种主动的方式,蓝色警报有助于增强社区的安全意识,促使人们采取必要的预防措施以保护自己和周围的人。这种警报系统的建立旨在提高对潜在威胁的警觉性,并加强执法机构与公众之间的沟通,以共尽管这些紧急通知对我们社会至关重要,但有时可能会对日常生活造成干扰,尤其是在午夜或重要活动时收到通知时。为了确保安全,我们建议您保持这些通知功能开启,但如果

在Android中实现轮询的方法是什么?在Android中实现轮询的方法是什么?Sep 21, 2023 pm 08:33 PM

Android中的轮询是一项关键技术,它允许应用程序定期从服务器或数据源检索和更新信息。通过实施轮询,开发人员可以确保实时数据同步并向用户提供最新的内容。它涉及定期向服务器或数据源发送请求并获取最新信息。Android提供了定时器、线程、后台服务等多种机制来高效地完成轮询。这使开发人员能够设计与远程数据源保持同步的响应式动态应用程序。本文探讨了如何在Android中实现轮询。它涵盖了实现此功能所涉及的关键注意事项和步骤。轮询定期检查更新并从服务器或源检索数据的过程在Android中称为轮询。通过

如何在Android中实现按下返回键再次退出的功能?如何在Android中实现按下返回键再次退出的功能?Aug 30, 2023 am 08:05 AM

为了提升用户体验并防止数据或进度丢失,Android应用程序开发者必须避免意外退出。他们可以通过加入“再次按返回退出”功能来实现这一点,该功能要求用户在特定时间内连续按两次返回按钮才能退出应用程序。这种实现显著提升了用户参与度和满意度,确保他们不会意外丢失任何重要信息Thisguideexaminesthepracticalstepstoadd"PressBackAgaintoExit"capabilityinAndroid.Itpresentsasystematicguid

Android逆向中smali复杂类实例分析Android逆向中smali复杂类实例分析May 12, 2023 pm 04:22 PM

1.java复杂类如果有什么地方不懂,请看:JAVA总纲或者构造方法这里贴代码,很简单没有难度。2.smali代码我们要把java代码转为smali代码,可以参考java转smali我们还是分模块来看。2.1第一个模块——信息模块这个模块就是基本信息,说明了类名等,知道就好对分析帮助不大。2.2第二个模块——构造方法我们来一句一句解析,如果有之前解析重复的地方就不再重复了。但是会提供链接。.methodpublicconstructor(Ljava/lang/String;I)V这一句话分为.m

如何在2023年将 WhatsApp 从安卓迁移到 iPhone 15?如何在2023年将 WhatsApp 从安卓迁移到 iPhone 15?Sep 22, 2023 pm 02:37 PM

如何将WhatsApp聊天从Android转移到iPhone?你已经拿到了新的iPhone15,并且你正在从Android跳跃?如果是这种情况,您可能还对将WhatsApp从Android转移到iPhone感到好奇。但是,老实说,这有点棘手,因为Android和iPhone的操作系统不兼容。但不要失去希望。这不是什么不可能完成的任务。让我们在本文中讨论几种将WhatsApp从Android转移到iPhone15的方法。因此,坚持到最后以彻底学习解决方案。如何在不删除数据的情况下将WhatsApp

同样基于linux为什么安卓效率低同样基于linux为什么安卓效率低Mar 15, 2023 pm 07:16 PM

原因:1、安卓系统上设置了一个JAVA虚拟机来支持Java应用程序的运行,而这种虚拟机对硬件的消耗是非常大的;2、手机生产厂商对安卓系统的定制与开发,增加了安卓系统的负担,拖慢其运行速度影响其流畅性;3、应用软件太臃肿,同质化严重,在一定程度上拖慢安卓手机的运行速度。

Android中动态导出dex文件的方法是什么Android中动态导出dex文件的方法是什么May 30, 2023 pm 04:52 PM

1.启动ida端口监听1.1启动Android_server服务1.2端口转发1.3软件进入调试模式2.ida下断2.1attach附加进程2.2断三项2.3选择进程2.4打开Modules搜索artPS:小知识Android4.4版本之前系统函数在libdvm.soAndroid5.0之后系统函数在libart.so2.5打开Openmemory()函数在libart.so中搜索Openmemory函数并且跟进去。PS:小知识一般来说,系统dex都会在这个函数中进行加载,但是会出现一个问题,后

Android APP测试流程和常见问题是什么Android APP测试流程和常见问题是什么May 13, 2023 pm 09:58 PM

1.自动化测试自动化测试主要包括几个部分,UI功能的自动化测试、接口的自动化测试、其他专项的自动化测试。1.1UI功能自动化测试UI功能的自动化测试,也就是大家常说的自动化测试,主要是基于UI界面进行的自动化测试,通过脚本实现UI功能的点击,替代人工进行自动化测试。这个测试的优势在于对高度重复的界面特性功能测试的测试人力进行有效的释放,利用脚本的执行,实现功能的快速高效回归。但这种测试的不足之处也是显而易见的,主要包括维护成本高,易发生误判,兼容性不足等。因为是基于界面操作,界面的稳定程度便成了

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
2 周前By尊渡假赌尊渡假赌尊渡假赌
仓库:如何复兴队友
4 周前By尊渡假赌尊渡假赌尊渡假赌
Hello Kitty Island冒险:如何获得巨型种子
3 周前By尊渡假赌尊渡假赌尊渡假赌

热工具

DVWA

DVWA

Damn Vulnerable Web App (DVWA) 是一个PHP/MySQL的Web应用程序,非常容易受到攻击。它的主要目标是成为安全专业人员在合法环境中测试自己的技能和工具的辅助工具,帮助Web开发人员更好地理解保护Web应用程序的过程,并帮助教师/学生在课堂环境中教授/学习Web应用程序安全。DVWA的目标是通过简单直接的界面练习一些最常见的Web漏洞,难度各不相同。请注意,该软件中

Atom编辑器mac版下载

Atom编辑器mac版下载

最流行的的开源编辑器

Dreamweaver Mac版

Dreamweaver Mac版

视觉化网页开发工具

PhpStorm Mac 版本

PhpStorm Mac 版本

最新(2018.2.1 )专业的PHP集成开发工具

SecLists

SecLists

SecLists是最终安全测试人员的伙伴。它是一个包含各种类型列表的集合,这些列表在安全评估过程中经常使用,都在一个地方。SecLists通过方便地提供安全测试人员可能需要的所有列表,帮助提高安全测试的效率和生产力。列表类型包括用户名、密码、URL、模糊测试有效载荷、敏感数据模式、Web shell等等。测试人员只需将此存储库拉到新的测试机上,他就可以访问到所需的每种类型的列表。