Home  >  Article  >  Database  >  MR总结(一)-Mapreduce原理解析

MR总结(一)-Mapreduce原理解析

WBOY
WBOYOriginal
2016-06-07 16:41:131365browse

本文主要内容: ★理解MapReduce基本原理 ★了解MapReduce应用的执行 ★理解MapReduce应用的设计 一、了解MapReduce MapReduce是一个框架,能够利用许多普通计算机对大规模的数据集进行高并发的、分布式的算法处理。 用户的任务是实现mapper与reducer,这两

本文主要内容:

★理解MapReduce基本原理

★了解MapReduce应用的执行

★理解MapReduce应用的设计

一、了解MapReduce

MapReduce是一个框架,能够利用许多普通计算机对大规模的数据集进行高并发的、分布式的算法处理。

用户的任务是实现mapper与reducer,这两个类会继承Hadoop提供的基础类来解决特殊的问题。就像图3-1中所示的,Mapper将key/value(k1,v1)键值对形式的数据作为输入,然后将他们转变为另一种key/value键值对形式。MapReduce框架对所有mapper输出的key/value进行排序,并将key值相同的所有value值合并(k2,{v2,v2,…})。这些kye/value结合后的会被传递到reducer模块,reducer会将他们转化为另一种key/value对(k3,v3)。

maper和reducer

maper和reducer

核心组件:

Mapper与Reducer和MapReduce框架。

Mapper功能:

一个mapper与reducer一起组成一个Hadoop作业。Mapper是作业的强制性部分,可以产生0个或者更多个key/value(k2,v2)键值对。

Reducer功能:

Reducer是作业的可选择性部分,可以没有产出或者产出更多的key/value对(k3,v3)。

MapReduce功能:

调度、同步、容错

MapReduce框架的主要任务(根据用户提供的代码)是统筹所有任务的协调执行。

包括

1.选择合适的机器(节点)运行mapper、启动与监控mapper的执行

2.为reducer的执行选择合适的节点,对mapper的输出进行排序与拉去并且传送给reducer节点、?启动与监控reducer的执行。

3.Mapreduce框架负责调度和监控任务,再次执行失败的任务。

现在我们对MapReduce有了一些了解,下面让我们进一步看看MapReduce作业是如何执行的。

二、Mapreduce作业执行

高层次的hadoop执行框架

高层次的hadoop执行框架

下面介绍MapReduce执行管道线的主要组件。

★? Driver:这是主程序,用来初始化MapReduce job。它定义了job的个性化配置,并且标注了所有的组件(包括输入输出格式,mapper与reducer,使用结合器,使用定制的分片器等等)。Driver也可以获得job执行的状态。

★? Context:driver、mapper与reducer在不同的阶段被执行,一般情况下是在多台节点上执行。context对象在MapReduce执行的任何阶段都可以被使用。它为交换需要的系统与job内部信息提供一种方便的机制。要注意context协调只发生在MapReduce job 开始后合适的阶段(driver,map或者reduce)。这意味着在一个mapper中设置的值不可以在另一个mapper中使用(即使另一个mapper在第一个mapper完成后开始),但是在任何reducer中都是有效的。

★? Input Data:为MapReduce任务准备的最初存储数据。这些数据可以在HDFS,HBase,或者其他的仓库中。一般情况下,input data 是非常大的,几十个G或者更多。

InputFormat:如何对输入数据进行读取和切分

★ ?InputFormat类确定input data中数据输入哪个任务的InputSplit,并且提供一个生成RecordReader的工厂方法,这个对象主要是读取inputSplit指定的文件。Hadoop提供了一些InputFormat类。InputFormat直接被job的driver调用来决定map任务执行的数目与地点(根据InputSplit)。

★? InputSplit:InputSplit确定一个在MapReduce中map任务的作业单元。处理一个数据集的MapReduce程序由几个(也可能是几百个)map任务组成。InputFormat(直接被job driver调用)确定在map阶段中map任务的数目。每个map任务操作一个单独的InputSplit。完成InputSplits的计算后,MapReduce框架会再合适的节点启动期望数目的map任务。

★? RecorReader:InputSplit确定map任务的工作机,但没有描述如何获得该数据。RecordReader类是真正从数据源读取数据的类(在map 任务中),并将数据转化为设和map执行的key/value对,并将他们传递给map方法。RecordReader由InputFormat定义。

★? Mapper:mapper负责在MapReduce程序中第一个阶段用户自定义作业的执行。从实现的角度看,mapper实现负责将输入数据转化成一些列的key/value对(k1,v1),这些键值对将被用于单个map的执行。一般情况下mapp会将输入的键值对转化为另一种输出键值对(k2,v2),这些输出键值对将会作为reduce阶段shuffle与sort阶段的输入。一个新的mapper实例在每个map任务的单独的JVM实体中被实例化, 这些map任务构成所有作业输出的一部分。独立的mapper是不会提供任何与其他mapper通信的机制。这一点保证每个map任务的可靠性仅仅由本地节点的可靠性决定。

★? Partition:由所有独立的mapper产生的中间数据(k2,v2)的子集会被分配到一个reducer上执行。这些子集(或者partitions)会作为reduce任务的输入。具备相同键的数值会被一个reduce处理,而不会考虑他们有哪个mapper产生。这样的结果是,所有的map节点必须判断产生的中间数据将有哪个reducer执行。Partitioner类决定特定的key/value对将由哪个reducer执行。默认的Partitioner会为每个key计算一个哈希值,并根据这个值作为分配的依据。

★? Shuffle:在Hadoop集群中,每个节点可能会执行某个job的几个map任务。一旦至少有一个map函数执行完成,产生的中间输出就会根据key值进行分片,并将由map产生的的分片分发至需要它们的reducer。将map的输出传递到reducer的过程叫做shuffling。

★? Sort:每个reduce任务负责处理与部分key值相对应的value。中间key/value数据集,在被传递给reducer前会由Hadoop框架自动排序,组装成(k2,{v2,v2,…})的形式。

★? Reducer:reducer负责执行由用户提供的用于完成某个作业第二阶段任务的代码。对于分配到某个reducer中的每个key,reducer的reduce()方法都会被调用一次。这个方法接收一个key值,由迭代器遍历与它绑定在一起的所有value值,并无序的返回与这个key值相关的value值。一般情况下,reducer将输入的key/value转化成输出键值对(k3,v3)。

★? OutputFormat:job的输出(job的输出可以由reducer产生,若没有reducer也可由map产生)记录的方式有OutputFormat控制。OutputFormat负责确定输出数据的地址,由RecordWriter负责将数据结果写入。

★? RecordWriter:RecordWriter定义每条output记录如何写入。

下面将介绍MapReduce执行时两个可选的组件

★? Combiner:这是一个可以优化MapReduce job执行的可选执行步骤。如何选择后,combiner运行在mapper执行后,reduce执行前。Combiner的实例会运行在每个map任务中与部分reduce任务中。Combiner接收由mapper实例输出的所有数据作为输入,并且尝试将具有相同key值的value整合,以此来减少key值的存储空间和减少必须存储的(实际上不必须)key值的数目。Combiner的输出会被排序并发送给reducer。

★? Distribute cache:另一个常用与MapReduce job中的工具是distribute cache。这个组件可以使得集群中所有节点共享数据。Distribute cache可以是能够被所有任务都能可获得的共享库,包含key/value对的全局查找文件,jar文件(或者是archives)包含可执行代码等等。该工具会将这些文件拷贝至实际执行任务的节点,并使它们可以在本地使用。

三、MapReduce编程模型

MapReduce编程模型主要包括Mapper和Reducer两个内部类和主方法。

下面直接看代码:

package com.sven.mrlearn;

import java.io.IOException;

import java.util.Iterator;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool {

public static class Map extends

Mapper {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

@Override

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, one);

}

}

}

public static class Reduce extends

Reducer {

@Override

public void reduce(Text key, Iterable val, Context context)

throws IOException, InterruptedException {

int sum = 0;

Iterator values = val.iterator();

while (values.hasNext()) {

sum += values.next().get();

}

context.write(key, new IntWritable(sum));

}

}

public int run(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = new Job(conf, “Word Count”);

job.setJarByClass(WordCount.class);

// Set up the input

job.setInputFormatClass(TextInputFormat.class);

TextInputFormat.addInputPath(job, new Path(args[0]));

// Mapper

job.setMapperClass(Map.class);

// Reducer

job.setReducerClass(Reduce.class);

// Output

job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

TextOutputFormat.setOutputPath(job, new Path(args[1]));

// Execute

boolean res = job.waitForCompletion(true);

if (res)

return 0;

else

return -1;

}

public static void main(String[] args) throws Exception {

int res = ToolRunner.run(new WordCount(), args);

System.exit(res);

}

}

Mapper静态内部类

mapper包含三个主要的方法:setup, cleanup, and map。其中map是我们必须手动实现的。

setup和cleanup在一个指定的mapper周期内只执行一次。因此我们可以再这里做一些任务初始化的工作,如在setup打开共享文件,打开hbase等数据库连接。

同理cleanup用于清理任务,释放资源。

map是这中做繁忙的方法,它不断接受接受键值对,不断处理键值对和通过context写出结果键值对。值得注意的是map并不直接读取记录,而是由reader(该组件可重写)读取

然后通过context传递给map。

那么map是怎么执行的呢?

我们打开Mapper类,有这么一个run方法:

/**

* Expert users can override this method for more complete control over the

* execution of the Mapper.

* @param context

* @throws IOException

*/

public void run(Context context) throws IOException, InterruptedException {

setup(context);

while (context.nextKeyValue()) {

map(context.getCurrentKey(), context.getCurrentValue(), context);

}

cleanup(context);

}

Reducer静态内部类

和Mapper类一样,Reducer类也有setup, cleanup, reduce,和一个run方法。

setup, cleanup, reduce方法和Mapper里面的setup, cleanup, map类似。唯一不同的是reduce接受的是一个key对应一个值的集合的迭代器。

(remember, a reducer is invoked ?after execution of shuffle and sort, at which point, all the input key/value pairs are sorted, and

all the values for the same key are partitioned to a single reducer and come together)

总结

这里是MapReduce的原理及编程模型的概要介绍。后面章节我们将介绍《MR总结-Mapreduce原理解析(一)》《MR总结-Mapreduce原理解析(一)》《MR总结-Mapreduce原理解析(一)》

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn