Heim >Datenbank >MySQL-Tutorial >MapReduce2.0处理机制
MapReduce(分布式计算模型)作为Hadoop家族一重要的家庭成员主要用于搜素领域,海量数据计算等问题。 内部模型采用分而治之的思想。MapReduce分为两部分(Map和Reduce)。其中Shuffler是对Reduce的预处理。 map和reduce的数据处理方式均采取键对的方式:即 [k1
MapReduce(分布式计算模型)作为Hadoop家族一重要的家庭成员主要用于搜素领域,海量数据计算等问题。
内部模型采用"分而治之"的思想。MapReduce分为两部分(Map和Reduce)。其中Shuffler是对Reduce的预处理。
map和reduce的数据处理方式均采取键值对的方式:即 [k1,v1]->MAP->[K2,V2]->Reduce->[k3,v3]。
MR执行流程
(1).客户端提交一个mr的jar包给JobClient(提交方式:hadoop jar ...)
(2).JobClient通过RPC和JobTracker进行通信,返回一个存放jar包的地址(HDFS)和jobId
(3).client将jar包写入到HDFS当中(path = hdfs上的地址 + jobId)
(4).开始提交任务(任务的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
(5).JobTracker进行初始化任务
(6).读取HDFS上的要处理的文件,开始计算输入分片,每一个分片对应一个MapperTask
(7).TaskTracker通过心跳机制领取任务(任务的描述信息)
(8).下载所需的jar,配置文件等
(9).TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask)
(10).将结果写入到HDFS当中
在hadoop2.0以上版本中JobTracker取名为RM(resourceManage) TastTracker取名为NM(nodeManage)
mapReduce操作实现wordcount功能(即从文本中读取内容,计算出每个单词出现的次数)
程序分为3个类(自定义MAP方法功能实现,自定义REDUCE方法功能实现,最后类拼凑成mapreduce模式导成jar包,在HDFS分布式功能中实现)
1.WCMapper类(实现map)
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* 给wordcount写mapper
* 定义mapper
* KEYIN:k1的类型
* VALUEIN:v1的类型
*
* 重写map方法
* hadoop没有使用jdk默认的序列化机制(long->longwriteable String->Text)
*/
public class WCMapper extends Mapper
@Override
protected void map(LongWritable key, Text value,
Mapper
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// 接收信息V1
String line = value.toString();
// 切分数据
String[] words = line.split(" ");
// 循环
for (String w : words) {
// 出现一次记一个1,输出
// 构一个新的key,value
context.write(new Text(w), new LongWritable(1));
}
}
}
2.WCReducer类实现reduce功能
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* KEYIN k2的类型
* VALUEIN v2的类型
*
* 重写reducer方法
*/
public class WCReducer extends Reducer
@Override
protected void reduce(Text k2, Iterable
Reducer
throws IOException, InterruptedException {
// 接收数据
Text k3 = k2;
// 定义一个计数器
Long count = (long) 0;
// 循环v2s
for (LongWritable i : v2s) {
count += i.get();
}
// 输出
context.write(k3, new LongWritable(count));
}
}
3.wordCount类。拼凑前两个类,符合mapreduce格式
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*
* mapReduce
*
* 组装自定义的map和reduce
*/
public class wordCount {
public static void main(String[] args) throws Exception {
// Job job=Job.instance(new Configuration()); //版本hadoop2
Job job = new Job(new Configuration()); // 版本hadoop1
// 4.注意---将main方法中的类设进去
job.setJarByClass(wordCount.class);
// 1.设置自定义Mapper
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 设置mapper读入的path(hdfs路径)
FileInputFormat.setInputPaths(job, new Path("/words.txt"));
// 2.设置reduce
job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path("/WcountResult"));
// 3.提交
job.waitForCompletion(true); // 打印进度和详情
}
}