hadoop之steaming介绍 hadoop有个工具叫做steaming,能够支持python、shell、C++、PHP等其他任何支持标准输入stdin及标准输出stdout的语言,其运行原理可以通过和标准java的map-reduce程序对比来说明: 使用原生java语言实现Map-reduce程序 hadoop准备好数据
hadoop之steaming介绍
hadoop有个工具叫做steaming,能够支持python、shell、C++、PHP等其他任何支持标准输入stdin及标准输出stdout的语言,其运行原理可以通过和标准java的map-reduce程序对比来说明:
使用原生java语言实现Map-reduce程序
- hadoop准备好数据后,将数据传送给java的map程序
- java的map程序将数据处理后,输出O1
- hadoop将O1打散、排序,然后传给不同的reduce机器
- 每个reduce机器将传来的数据传给reduce程序
- reduce程序将数据处理,输出最终数据O2
借助hadoop streaming使用python语言实现Map-reduce程序
- hadoop准备好数据后,将数据传送给java的map程序
- java的map程序将数据处理成“键/值”对,并传送给python的map程序
- python的map程序将数据处理后,将结果传回给java的map程序
- java的map程序将数据输出为O1
- hadoop将O1打散、排序,然后传给不同的reduce机器
- 每个reduce机器将传来的数据处理成“键/值”对,并传送给python的reduce程序
- python的reduce程序将数据处理后,将结果返回给java的reduce程序
- java的reduce程序将数据处理,输出最终数据O2
上面红色表示map的对比,蓝色表示reduce的对比,可以看出streaming程序多了一步中间处理,这样说来steaming程序的效率和性能应该低于java版的程序,然而python的开发效率、运行性能有时候会大于java,这就是streaming的优势所在。
hadoop之实现集合join的需求
hadoop是用来做数据分析的,大都是对集合进行操作,因此该过程中将集合join起来使得一个集合能得到另一个集合对应的信息的需求非常常见。
比如以下这个需求,有两份数据:学生信息(学号,姓名)和学生成绩(学号、课程、成绩),特点是有个共同的主键“学号”,现在需要将两者结合起来得到数据(学号,姓名,课程,成绩),计算公式:
(学号,姓名) join (学号,课程,成绩)= (学号,姓名,课程,成绩)
数据事例1-学生信息:
学号sno | 姓名name |
01 | name1 |
02 | name2 |
03 | name3 |
04 | name4 |
数据事例2:-学生成绩:
学号sno | 课程号courseno | 成绩grade |
01 | 01 | 80 |
01 | 02 | 90 |
02 | 01 | 82 |
02 | 02 | 95 |
期待的最终输出:
学号sno | 姓名name | 课程courseno | 成绩grade |
01 | name1 | 01 | 80 |
01 | name1 | 02 | 90 |
02 | name2 | 01 | 82 |
02 | name2 | 02 | 95 |
实现join的注意点和易踩坑总结
如果你想写一个完善健壮的map reduce程序,我建议你首先弄清楚输入数据的格式、输出数据的格式,然后自己手动构建输入数据并手动计算出输出数据,这个过程中你会发现一些写程序中需要特别处理的地方:
- 实现join的key是哪个,是1个字段还是2个字段,本例中key是sno,1个字段
- 每个集合中key是否可以重复,本例中数据1不可重复,数据2的key可以重复
- 每个集合中key的对应值是否可以不存在,本例中有学生会没成绩,所以数据2的key可以为空
第1条会影响到hadoop启动脚本中key.fields和partition的配置,第2条会影响到map-reduce程序中具体的代码实现方式,第3条同样影响代码编写方式。
hadoop实现join操作的思路
具体思路是给每个数据源加上一个数字标记label,这样hadoop对其排序后同一个字段的数据排在一起并且按照label排好序了,于是直接将相邻相同key的数据合并在一起输出就得到了结果。
1、 map阶段:给表1和表2加标记,其实就是多输出一个字段,比如表一加标记为0,表2加标记为2;
2、 partion阶段:根据学号key为第一主键,标记label为第二主键进行排序和分区
3、 reduce阶段:由于已经按照第一主键、第二主键排好了序,将相邻相同key数据合并输出
hadoop使用python实现join的map和reduce代码
mapper.py的代码:
# -*- coding: utf-8 -*- #Mapper.py #来自疯狂的蚂蚁www.crazyant.net import os import sys #mapper脚本 def mapper(): #获取当前正在处理的文件的名字,这里我们有两个输入文件 #所以要加以区分 filepath = os.environ["map_input_file"] filename = os.path.split(filepath)[-1] for line in sys.stdin: if line.strip()=="": continue fields = line[:-1].split("\t") sno = fields[0] #以下判断filename的目的是不同的文件有不同的字段,并且需加上不同的标记 if filename == 'data_info': name = fields[1] #下面的数字'0'就是为数据源1加上的统一标记 print '\t'.join((sno,'0',name)) elif filename == 'data_grade': courseno = fields[1] grade = fields[2] #下面的数字'1'就是为数据源1加上的统一标记 print '\t'.join((sno,'1',courseno,grade)) if __name__=='__main__': mapper()
reducer的代码:
# -*- coding: utf-8 -*- #reducer.py #来自疯狂的蚂蚁www.crazyant.net import sys def reducer(): #为了记录和上一个记录的区别,用lastsno记录上个sno lastsno = "" for line in sys.stdin: if line.strip()=="": continue fields = line[:-1].split("\t") sno = fields[0] ''' 处理思路: 遇见当前key与上一条key不同并且label=0,就记录下来name值, 当前key与上一条key相同并且label==1,则将本条数据的courseno、 grade联通上一条记录的name一起输出成最终结果 ''' if sno != lastsno: name="" #这里没有判断label==1的情况, #因为sno!=lastno,并且label=1表示该条key没有数据源1的数据 if fields[1]=="0": name=fields[2] elif sno==lastno: #这里没有判断label==0的情况, #因为sno==lastno并且label==0表示该条key没有数据源2的数据 if fields[2]=="1": courseno=fields[2] grade=fields[3] if name: print '\t'.join((lastsno,name,courseno,grade)) lastsno = sno if __name__=='__main__': reducer()
使用shell脚本启动hadoop程序的方法:
#先删除输出目录 ~/hadoop-client/hadoop/bin/hadoop fs -rmr /hdfs/jointest/output #来自疯狂的蚂蚁www.crazyant.net #注意,下面配置中的环境值每个人机器不一样 ~/hadoop-client/hadoop/bin/hadoop streaming \ -D mapred.map.tasks=10 \ -D mapred.reduce.tasks=5 \ -D mapred.job.map.capacity=10 \ -D mapred.job.reduce.capacity=5 \ -D mapred.job.name="join--sno_name-sno_courseno_grade" \ -D num.key.fields.for.partition=1 \ -D stream.num.map.output.key.fields=2 \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ -input "/hdfs/jointest/input/*" \ -output "/hdfs/jointest/output" \ -mapper "python26/bin/python26.sh mapper.py" \ -reducer "python26/bin/python26.sh reducer.py" \ -file "mapper.py" \ -file "reducer.py" \ -cacheArchive "/share/python26.tar.gz#python26" #看看运行成功没,若输出0则表示成功了 echo $?
可以自己手工构造输入输出数据进行测试,本程序是验证过的。
更多需要注意的地方
hadoop的join操作可以分为很多类型,各种类型脚本的编写有所不同,其分类是按照key字段数目、value字段数目、key是否可重复来划分的,以下是一个个人总结的对照表,表示会影响的地方:
影响类型 | 影响的范围 |
key字段数目 | 1、启动脚本中num.key.fields.for.partition的配置2、启动脚本中stream.num.map.output.key.fields的配置 |
3、map和reduce脚本中key的获取
4、map和reduce脚本中每一条数据和上一条数据比较的方法key是否可重复如果数据源1可重复,标记为M;数据源2可重复标记为N,那么join可以分为:1*1、M*1、M*N类型
1*1类型:reduce中先记录第一个value,然后在下一条直接合并输出;
M*1类型:将类型1作为标记小的输出,然后每次遇见label=1就记录value,每遇见一次label=2就输出一次最终结果;
M*N类型:遇见类型1,就用数组记录value值,遇见label=2就将将记录的数组值全部连同该行value输出。value字段数目影响每次label=1时记录的数据个数,需要将value都记录下来
原文链接 转载须注明!
原文地址:Hadoop之使用python实现数据集合间join操作, 感谢原作者分享。

存储过程是MySQL中的预编译SQL语句集合,用于提高性能和简化复杂操作。1.提高性能:首次编译后,后续调用无需重新编译。2.提高安全性:通过权限控制限制数据表访问。3.简化复杂操作:将多条SQL语句组合,简化应用层逻辑。

MySQL查询缓存的工作原理是通过存储SELECT查询的结果,当相同查询再次执行时,直接返回缓存结果。1)查询缓存提高数据库读取性能,通过哈希值查找缓存结果。2)配置简单,在MySQL配置文件中设置query_cache_type和query_cache_size。3)使用SQL_NO_CACHE关键字可以禁用特定查询的缓存。4)在高频更新环境中,查询缓存可能导致性能瓶颈,需通过监控和调整参数优化使用。

MySQL被广泛应用于各种项目中的原因包括:1.高性能与可扩展性,支持多种存储引擎;2.易于使用和维护,配置简单且工具丰富;3.丰富的生态系统,吸引大量社区和第三方工具支持;4.跨平台支持,适用于多种操作系统。

MySQL数据库升级的步骤包括:1.备份数据库,2.停止当前MySQL服务,3.安装新版本MySQL,4.启动新版本MySQL服务,5.恢复数据库。升级过程需注意兼容性问题,并可使用高级工具如PerconaToolkit进行测试和优化。

MySQL备份策略包括逻辑备份、物理备份、增量备份、基于复制的备份和云备份。1.逻辑备份使用mysqldump导出数据库结构和数据,适合小型数据库和版本迁移。2.物理备份通过复制数据文件,速度快且全面,但需数据库一致性。3.增量备份利用二进制日志记录变化,适用于大型数据库。4.基于复制的备份通过从服务器备份,减少对生产系统的影响。5.云备份如AmazonRDS提供自动化解决方案,但成本和控制需考虑。选择策略时应考虑数据库大小、停机容忍度、恢复时间和恢复点目标。

MySQLclusteringenhancesdatabaserobustnessandscalabilitybydistributingdataacrossmultiplenodes.ItusestheNDBenginefordatareplicationandfaulttolerance,ensuringhighavailability.Setupinvolvesconfiguringmanagement,data,andSQLnodes,withcarefulmonitoringandpe

在MySQL中优化数据库模式设计可通过以下步骤提升性能:1.索引优化:在常用查询列上创建索引,平衡查询和插入更新的开销。2.表结构优化:通过规范化或反规范化减少数据冗余,提高访问效率。3.数据类型选择:使用合适的数据类型,如INT替代VARCHAR,减少存储空间。4.分区和分表:对于大数据量,使用分区和分表分散数据,提升查询和维护效率。

tooptimizemysqlperformance,lofterTheSeSteps:1)inasemproperIndexingTospeedUpqueries,2)使用ExplaintplaintoAnalyzeandoptimizequeryPerformance,3)ActiveServerConfigurationStersLikeTlikeTlikeTlikeIkeLikeIkeIkeLikeIkeLikeIkeLikeIkeLikeNodb_buffer_pool_sizizeandmax_connections,4)


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

SublimeText3 Linux新版
SublimeText3 Linux最新版

MinGW - 适用于 Windows 的极简 GNU
这个项目正在迁移到osdn.net/projects/mingw的过程中,你可以继续在那里关注我们。MinGW:GNU编译器集合(GCC)的本地Windows移植版本,可自由分发的导入库和用于构建本地Windows应用程序的头文件;包括对MSVC运行时的扩展,以支持C99功能。MinGW的所有软件都可以在64位Windows平台上运行。

mPDF
mPDF是一个PHP库,可以从UTF-8编码的HTML生成PDF文件。原作者Ian Back编写mPDF以从他的网站上“即时”输出PDF文件,并处理不同的语言。与原始脚本如HTML2FPDF相比,它的速度较慢,并且在使用Unicode字体时生成的文件较大,但支持CSS样式等,并进行了大量增强。支持几乎所有语言,包括RTL(阿拉伯语和希伯来语)和CJK(中日韩)。支持嵌套的块级元素(如P、DIV),

Dreamweaver Mac版
视觉化网页开发工具

SublimeText3汉化版
中文版,非常好用