Heim >Datenbank >MySQL-Tutorial >MapReduce2.0处理机制

MapReduce2.0处理机制

WBOY
WBOYOriginal
2016-06-07 15:08:491267Durchsuche

MapReduce(分布式计算模型)作为Hadoop家族一重要的家庭成员主要用于搜素领域,海量数据计算等问题。 内部模型采用分而治之的思想。MapReduce分为两部分(Map和Reduce)。其中Shuffler是对Reduce的预处理。 map和reduce的数据处理方式均采取键对的方式:即 [k1

                      MapReduce(分布式计算模型)作为Hadoop家族一重要的家庭成员主要用于搜素领域,海量数据计算等问题。

                 内部模型采用"分而治之"的思想。MapReduce分为两部分(Map和Reduce)。其中Shuffler是对Reduce的预处理。MapReduce2.0处理机制

map和reduce的数据处理方式均采取键值对的方式:即  [k1,v1]->MAP->[K2,V2]->Reduce->[k3,v3]。

MR执行流程
 (1).客户端提交一个mr的jar包给JobClient(提交方式:hadoop jar ...)
 (2).JobClient通过RPC和JobTracker进行通信,返回一个存放jar包的地址(HDFS)和jobId
 (3).client将jar包写入到HDFS当中(path = hdfs上的地址 + jobId)
 (4).开始提交任务(任务的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
 (5).JobTracker进行初始化任务
 (6).读取HDFS上的要处理的文件,开始计算输入分片,每一个分片对应一个MapperTask
 (7).TaskTracker通过心跳机制领取任务(任务的描述信息)
 (8).下载所需的jar,配置文件等
 (9).TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask)
 (10).将结果写入到HDFS当中

在hadoop2.0以上版本中JobTracker取名为RM(resourceManage)  TastTracker取名为NM(nodeManage)

MapReduce2.0处理机制

mapReduce操作实现wordcount功能(即从文本中读取内容,计算出每个单词出现的次数)

程序分为3个类(自定义MAP方法功能实现,自定义REDUCE方法功能实现,最后类拼凑成mapreduce模式导成jar包,在HDFS分布式功能中实现)

1.WCMapper类(实现map)

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*
 * 给wordcount写mapper
 * 定义mapper
 * KEYIN:k1的类型
 * VALUEIN:v1的类型
 *
 * 重写map方法
 * hadoop没有使用jdk默认的序列化机制(long->longwriteable String->Text)
 */
public class WCMapper extends Mapper {

 @Override
 protected void map(LongWritable key, Text value,
   Mapper.Context context)
   throws IOException, InterruptedException {
  // TODO Auto-generated method stub

  // 接收信息V1
  String line = value.toString();
  // 切分数据
  String[] words = line.split(" ");
  // 循环
  for (String w : words) {
   // 出现一次记一个1,输出
   // 构一个新的key,value
   context.write(new Text(w), new LongWritable(1));
  }
 }

}

2.WCReducer类实现reduce功能

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/*
 * KEYIN k2的类型
 * VALUEIN v2的类型
 *
 * 重写reducer方法
 */
public class WCReducer extends Reducer {

 @Override
 protected void reduce(Text k2, Iterable v2s,
   Reducer.Context context)
   throws IOException, InterruptedException {
  // 接收数据
  Text k3 = k2;
  // 定义一个计数器
  Long count = (long) 0;
  // 循环v2s
  for (LongWritable i : v2s) {
   count += i.get();
  }
  // 输出
  context.write(k3, new LongWritable(count));
 }

}

3.wordCount类。拼凑前两个类,符合mapreduce格式


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*
 * mapReduce
 *
 * 组装自定义的map和reduce
 */
public class wordCount {
 public static void main(String[] args) throws Exception {
  // Job job=Job.instance(new Configuration()); //版本hadoop2
  Job job = new Job(new Configuration()); // 版本hadoop1

  // 4.注意---将main方法中的类设进去
  job.setJarByClass(wordCount.class);

  // 1.设置自定义Mapper
  job.setMapperClass(WCMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(LongWritable.class);

  // 设置mapper读入的path(hdfs路径)
  FileInputFormat.setInputPaths(job, new Path("/words.txt"));

  // 2.设置reduce
  job.setReducerClass(WCReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(LongWritable.class);

  FileOutputFormat.setOutputPath(job, new Path("/WcountResult"));

  // 3.提交
  job.waitForCompletion(true); // 打印进度和详情
 }
}

 

 

 

 

 

 

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn