찾다
데이터 베이스MySQL 튜토리얼Hadoop之使用python实现数据集合间join操作

Hadoop之使用python实现数据集合间join操作

Jun 07, 2016 pm 04:30 PM
hadoopjoinpython사용성취하다데이터모으다

hadoop之steaming介绍 hadoop有个工具叫做steaming,能够支持python、shell、C++、PHP等其他任何支持标准输入stdin及标准输出stdout的语言,其运行原理可以通过和标准java的map-reduce程序对比来说明: 使用原生java语言实现Map-reduce程序 hadoop准备好数据

hadoop之steaming介绍

hadoop有个工具叫做steaming,能够支持python、shell、C++、PHP等其他任何支持标准输入stdin及标准输出stdout的语言,其运行原理可以通过和标准java的map-reduce程序对比来说明:

使用原生java语言实现Map-reduce程序
  1. hadoop准备好数据后,将数据传送给java的map程序
  2. java的map程序将数据处理后,输出O1
  3. hadoop将O1打散、排序,然后传给不同的reduce机器
  4. 每个reduce机器将传来的数据传给reduce程序
  5. reduce程序将数据处理,输出最终数据O2
借助hadoop streaming使用python语言实现Map-reduce程序
  1. hadoop准备好数据后,将数据传送给java的map程序
  2. java的map程序将数据处理成“键/值”对,并传送给python的map程序
  3. python的map程序将数据处理后,将结果传回给java的map程序
  4. java的map程序将数据输出为O1
  5. hadoop将O1打散、排序,然后传给不同的reduce机器
  6. 每个reduce机器将传来的数据处理成“键/值”对,并传送给python的reduce程序
  7. python的reduce程序将数据处理后,将结果返回给java的reduce程序
  8. java的reduce程序将数据处理,输出最终数据O2

上面红色表示map的对比,蓝色表示reduce的对比,可以看出streaming程序多了一步中间处理,这样说来steaming程序的效率和性能应该低于java版的程序,然而python的开发效率、运行性能有时候会大于java,这就是streaming的优势所在。

hadoop之实现集合join的需求

hadoop是用来做数据分析的,大都是对集合进行操作,因此该过程中将集合join起来使得一个集合能得到另一个集合对应的信息的需求非常常见。

比如以下这个需求,有两份数据:学生信息(学号,姓名)和学生成绩(学号、课程、成绩),特点是有个共同的主键“学号”,现在需要将两者结合起来得到数据(学号,姓名,课程,成绩),计算公式:

学号,姓名) join (学号,课程,成绩)= (学号,姓名,课程,成绩)

数据事例1-学生信息:

学号sno 姓名name
01 name1
02 name2
03 name3
04 name4

数据事例2:-学生成绩:

学号sno 课程号courseno 成绩grade
01 01 80
01 02 90
02 01 82
02 02 95

期待的最终输出:

学号sno 姓名name 课程courseno 成绩grade
01 name1 01 80
01 name1 02 90
02 name2 01 82
02 name2 02 95

实现join的注意点和易踩坑总结

如果你想写一个完善健壮的map reduce程序,我建议你首先弄清楚输入数据的格式、输出数据的格式,然后自己手动构建输入数据并手动计算出输出数据,这个过程中你会发现一些写程序中需要特别处理的地方:

  1. 实现join的key是哪个,是1个字段还是2个字段,本例中key是sno,1个字段
  2. 每个集合中key是否可以重复,本例中数据1不可重复,数据2的key可以重复
  3. 每个集合中key的对应值是否可以不存在,本例中有学生会没成绩,所以数据2的key可以为空

第1条会影响到hadoop启动脚本中key.fields和partition的配置,第2条会影响到map-reduce程序中具体的代码实现方式,第3条同样影响代码编写方式。

hadoop实现join操作的思路

具体思路是给每个数据源加上一个数字标记label,这样hadoop对其排序后同一个字段的数据排在一起并且按照label排好序了,于是直接将相邻相同key的数据合并在一起输出就得到了结果。

1、 map阶段:给表1和表2加标记,其实就是多输出一个字段,比如表一加标记为0,表2加标记为2;

2、 partion阶段:根据学号key为第一主键,标记label为第二主键进行排序和分区

3、 reduce阶段:由于已经按照第一主键、第二主键排好了序,将相邻相同key数据合并输出

hadoop使用python实现join的map和reduce代码

mapper.py的代码:

# -*- coding: utf-8 -*-
#Mapper.py
#来自疯狂的蚂蚁www.crazyant.net
import os
import sys
#mapper脚本
def mapper():
	#获取当前正在处理的文件的名字,这里我们有两个输入文件
	#所以要加以区分
	filepath = os.environ["map_input_file"]
	filename = os.path.split(filepath)[-1]
	for line in sys.stdin:
		if line.strip()=="":
			continue
		fields = line[:-1].split("\t")
		sno = fields[0]
		#以下判断filename的目的是不同的文件有不同的字段,并且需加上不同的标记
		if filename == 'data_info':
			name = fields[1]
			#下面的数字'0'就是为数据源1加上的统一标记
			print '\t'.join((sno,'0',name))
		elif filename == 'data_grade':
			courseno = fields[1]
			grade = fields[2]
			#下面的数字'1'就是为数据源1加上的统一标记
			print '\t'.join((sno,'1',courseno,grade))
if __name__=='__main__':
	mapper()

reducer的代码:

# -*- coding: utf-8 -*-
#reducer.py
#来自疯狂的蚂蚁www.crazyant.net
import sys
def reducer():
	#为了记录和上一个记录的区别,用lastsno记录上个sno
	lastsno = ""
	for line in sys.stdin:
		if line.strip()=="":
			continue
		fields = line[:-1].split("\t")
		sno = fields[0]
		'''
		处理思路:
		遇见当前key与上一条key不同并且label=0,就记录下来name值,
		当前key与上一条key相同并且label==1,则将本条数据的courseno、
		grade联通上一条记录的name一起输出成最终结果
		'''
		if sno != lastsno:
			name=""
			#这里没有判断label==1的情况,
			#因为sno!=lastno,并且label=1表示该条key没有数据源1的数据
			if fields[1]=="0":
				name=fields[2]
		elif sno==lastno:
			#这里没有判断label==0的情况,
			#因为sno==lastno并且label==0表示该条key没有数据源2的数据
			if fields[2]=="1":
				courseno=fields[2]
				grade=fields[3]
				if name:
					print '\t'.join((lastsno,name,courseno,grade))
		lastsno = sno
if __name__=='__main__':
	reducer()

使用shell脚本启动hadoop程序的方法:

#先删除输出目录
~/hadoop-client/hadoop/bin/hadoop fs -rmr /hdfs/jointest/output
#来自疯狂的蚂蚁www.crazyant.net
#注意,下面配置中的环境值每个人机器不一样
~/hadoop-client/hadoop/bin/hadoop streaming \
	-D mapred.map.tasks=10 \
	-D mapred.reduce.tasks=5 \
	-D mapred.job.map.capacity=10 \
	-D mapred.job.reduce.capacity=5 \
	-D mapred.job.name="join--sno_name-sno_courseno_grade" \
	-D num.key.fields.for.partition=1 \
	-D stream.num.map.output.key.fields=2 \
	-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
	-input "/hdfs/jointest/input/*" \
	-output "/hdfs/jointest/output" \
	-mapper "python26/bin/python26.sh mapper.py" \
	-reducer "python26/bin/python26.sh reducer.py" \
	-file "mapper.py" \
	-file "reducer.py" \
	-cacheArchive "/share/python26.tar.gz#python26"
#看看运行成功没,若输出0则表示成功了
echo $?

可以自己手工构造输入输出数据进行测试,本程序是验证过的。

更多需要注意的地方

hadoop的join操作可以分为很多类型,各种类型脚本的编写有所不同,其分类是按照key字段数目、value字段数目、key是否可重复来划分的,以下是一个个人总结的对照表,表示会影响的地方:

影响类型 影响的范围
key字段数目 1、启动脚本中num.key.fields.for.partition的配置2、启动脚本中stream.num.map.output.key.fields的配置

3、map和reduce脚本中key的获取

4、map和reduce脚本中每一条数据和上一条数据比较的方法key是否可重复如果数据源1可重复,标记为M;数据源2可重复标记为N,那么join可以分为:1*1、M*1、M*N类型

1*1类型:reduce中先记录第一个value,然后在下一条直接合并输出;

M*1类型:将类型1作为标记小的输出,然后每次遇见label=1就记录value,每遇见一次label=2就输出一次最终结果;

M*N类型:遇见类型1,就用数组记录value值,遇见label=2就将将记录的数组值全部连同该行value输出。value字段数目影响每次label=1时记录的数据个数,需要将value都记录下来

原文链接 转载须注明!

성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
MySQL에 저장된 절차는 무엇입니까?MySQL에 저장된 절차는 무엇입니까?May 01, 2025 am 12:27 AM

저장된 절차는 성능을 향상시키고 복잡한 작업을 단순화하기 위해 MySQL에서 사전 컴파일 된 SQL 문입니다. 1. 성능 향상 : 첫 번째 편집 후 후속 통화를 다시 컴파일 할 필요가 없습니다. 2. 보안 향상 : 권한 제어를 통해 데이터 테이블 액세스를 제한합니다. 3. 복잡한 작업 단순화 : 여러 SQL 문을 결합하여 응용 프로그램 계층 로직을 단순화합니다.

쿼리 캐싱은 MySQL에서 어떻게 작동합니까?쿼리 캐싱은 MySQL에서 어떻게 작동합니까?May 01, 2025 am 12:26 AM

MySQL 쿼리 캐시의 작동 원리는 선택 쿼리 결과를 저장하는 것이며 동일한 쿼리가 다시 실행되면 캐시 된 결과가 직접 반환됩니다. 1) 쿼리 캐시는 데이터베이스 읽기 성능을 향상시키고 해시 값을 통해 캐시 된 결과를 찾습니다. 2) MySQL 구성 파일에서 간단한 구성, query_cache_type 및 query_cache_size를 설정합니다. 3) SQL_NO_CACHE 키워드를 사용하여 특정 쿼리의 캐시를 비활성화하십시오. 4) 고주파 업데이트 환경에서 쿼리 캐시는 성능 병목 현상을 유발할 수 있으며 매개 변수의 모니터링 및 조정을 통해 사용하기 위해 최적화해야합니다.

다른 관계형 데이터베이스를 통해 MySQL을 사용하면 어떤 장점이 있습니까?다른 관계형 데이터베이스를 통해 MySQL을 사용하면 어떤 장점이 있습니까?May 01, 2025 am 12:18 AM

MySQL이 다양한 프로젝트에서 널리 사용되는 이유에는 다음이 포함됩니다. 1. 고성능 및 확장 성, 여러 스토리지 엔진을 지원합니다. 2. 사용 및 유지 관리, 간단한 구성 및 풍부한 도구; 3. 많은 지역 사회 및 타사 도구 지원을 유치하는 풍부한 생태계; 4. 여러 운영 체제에 적합한 크로스 플랫폼 지원.

MySQL에서 데이터베이스 업그레이드를 어떻게 처리합니까?MySQL에서 데이터베이스 업그레이드를 어떻게 처리합니까?Apr 30, 2025 am 12:28 AM

MySQL 데이터베이스를 업그레이드하는 단계에는 다음이 포함됩니다. 1. 데이터베이스 백업, 2. 현재 MySQL 서비스 중지, 3. 새 버전의 MySQL 설치, 4. 새 버전의 MySQL 서비스 시작, 5. 데이터베이스 복구. 업그레이드 프로세스 중에 호환성 문제가 필요하며 Perconatoolkit과 같은 고급 도구를 테스트 및 최적화에 사용할 수 있습니다.

MySQL에 사용할 수있는 다른 백업 전략은 무엇입니까?MySQL에 사용할 수있는 다른 백업 전략은 무엇입니까?Apr 30, 2025 am 12:28 AM

MySQL 백업 정책에는 논리 백업, 물리적 백업, 증분 백업, 복제 기반 백업 및 클라우드 백업이 포함됩니다. 1. 논리 백업은 MySQLDump를 사용하여 데이터베이스 구조 및 데이터를 내보내며 소규모 데이터베이스 및 버전 마이그레이션에 적합합니다. 2. 물리적 백업은 데이터 파일을 복사하여 빠르고 포괄적이지만 데이터베이스 일관성이 필요합니다. 3. 증분 백업은 이진 로깅을 사용하여 변경 사항을 기록합니다. 이는 큰 데이터베이스에 적합합니다. 4. 복제 기반 백업은 서버에서 백업하여 생산 시스템에 미치는 영향을 줄입니다. 5. AmazonRDS와 같은 클라우드 백업은 자동화 솔루션을 제공하지만 비용과 제어를 고려해야합니다. 정책을 선택할 때 데이터베이스 크기, 가동 중지 시간 허용 오차, 복구 시간 및 복구 지점 목표를 고려해야합니다.

MySQL 클러스터링이란 무엇입니까?MySQL 클러스터링이란 무엇입니까?Apr 30, 2025 am 12:28 AM

mysqlclusteringenhancesdatabaserobustness andscalabilitydaturedingdataacrossmultiplenodes.itusesthendbenginefordatareplicationandfaulttolerance, highavailability를 보장합니다

MySQL의 성능을 위해 데이터베이스 스키마 설계를 어떻게 최적화합니까?MySQL의 성능을 위해 데이터베이스 스키마 설계를 어떻게 최적화합니까?Apr 30, 2025 am 12:27 AM

MySQL에서 데이터베이스 스키마 설계 최적화는 다음 단계를 통해 성능을 향상시킬 수 있습니다. 1. 인덱스 최적화 : 공통 쿼리 열에서 인덱스 생성, 쿼리의 오버 헤드 균형 및 업데이트 삽입. 2. 표 구조 최적화 : 정규화 또는 정상화를 통한 데이터 중복성을 줄이고 액세스 효율을 향상시킵니다. 3. 데이터 유형 선택 : 스토리지 공간을 줄이기 위해 Varchar 대신 Int와 같은 적절한 데이터 유형을 사용하십시오. 4. 분할 및 하위 테이블 : 대량 데이터 볼륨의 경우 파티션 및 하위 테이블을 사용하여 데이터를 분산시켜 쿼리 및 유지 보수 효율성을 향상시킵니다.

MySQL 성능을 어떻게 최적화 할 수 있습니까?MySQL 성능을 어떻게 최적화 할 수 있습니까?Apr 30, 2025 am 12:26 AM

tooptimizemysqlperformance, followthesesteps : 1) 구현 properIndexingToSpeedUpqueries, 2) useExplaintoAnalyzeanDoptimizeQueryPerformance, 3) AdvertServerConfigUrationSettingstingslikeInnodb_buffer_pool_sizeandmax_connections, 4) uspartOflEtOflEtOflestoI

See all articles

핫 AI 도구

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Clothoff.io

Clothoff.io

AI 옷 제거제

Video Face Swap

Video Face Swap

완전히 무료인 AI 얼굴 교환 도구를 사용하여 모든 비디오의 얼굴을 쉽게 바꾸세요!

뜨거운 도구

맨티스BT

맨티스BT

Mantis는 제품 결함 추적을 돕기 위해 설계된 배포하기 쉬운 웹 기반 결함 추적 도구입니다. PHP, MySQL 및 웹 서버가 필요합니다. 데모 및 호스팅 서비스를 확인해 보세요.

VSCode Windows 64비트 다운로드

VSCode Windows 64비트 다운로드

Microsoft에서 출시한 강력한 무료 IDE 편집기

WebStorm Mac 버전

WebStorm Mac 버전

유용한 JavaScript 개발 도구

mPDF

mPDF

mPDF는 UTF-8로 인코딩된 HTML에서 PDF 파일을 생성할 수 있는 PHP 라이브러리입니다. 원저자인 Ian Back은 자신의 웹 사이트에서 "즉시" PDF 파일을 출력하고 다양한 언어를 처리하기 위해 mPDF를 작성했습니다. HTML2FPDF와 같은 원본 스크립트보다 유니코드 글꼴을 사용할 때 속도가 느리고 더 큰 파일을 생성하지만 CSS 스타일 등을 지원하고 많은 개선 사항이 있습니다. RTL(아랍어, 히브리어), CJK(중국어, 일본어, 한국어)를 포함한 거의 모든 언어를 지원합니다. 중첩된 블록 수준 요소(예: P, DIV)를 지원합니다.

SublimeText3 Linux 새 버전

SublimeText3 Linux 새 버전

SublimeText3 Linux 최신 버전