Maison  >  Article  >  base de données  >  MR总结(一)-Mapreduce原理解析

MR总结(一)-Mapreduce原理解析

WBOY
WBOYoriginal
2016-06-07 16:41:131365parcourir

本文主要内容: ★理解MapReduce基本原理 ★了解MapReduce应用的执行 ★理解MapReduce应用的设计 一、了解MapReduce MapReduce是一个框架,能够利用许多普通计算机对大规模的数据集进行高并发的、分布式的算法处理。 用户的任务是实现mapper与reducer,这两

本文主要内容:

★理解MapReduce基本原理

★了解MapReduce应用的执行

★理解MapReduce应用的设计

一、了解MapReduce

MapReduce是一个框架,能够利用许多普通计算机对大规模的数据集进行高并发的、分布式的算法处理。

用户的任务是实现mapper与reducer,这两个类会继承Hadoop提供的基础类来解决特殊的问题。就像图3-1中所示的,Mapper将key/value(k1,v1)键值对形式的数据作为输入,然后将他们转变为另一种key/value键值对形式。MapReduce框架对所有mapper输出的key/value进行排序,并将key值相同的所有value值合并(k2,{v2,v2,…})。这些kye/value结合后的会被传递到reducer模块,reducer会将他们转化为另一种key/value对(k3,v3)。

maper和reducer

maper和reducer

核心组件:

Mapper与Reducer和MapReduce框架。

Mapper功能:

一个mapper与reducer一起组成一个Hadoop作业。Mapper是作业的强制性部分,可以产生0个或者更多个key/value(k2,v2)键值对。

Reducer功能:

Reducer是作业的可选择性部分,可以没有产出或者产出更多的key/value对(k3,v3)。

MapReduce功能:

调度、同步、容错

MapReduce框架的主要任务(根据用户提供的代码)是统筹所有任务的协调执行。

包括

1.选择合适的机器(节点)运行mapper、启动与监控mapper的执行

2.为reducer的执行选择合适的节点,对mapper的输出进行排序与拉去并且传送给reducer节点、?启动与监控reducer的执行。

3.Mapreduce框架负责调度和监控任务,再次执行失败的任务。

现在我们对MapReduce有了一些了解,下面让我们进一步看看MapReduce作业是如何执行的。

二、Mapreduce作业执行

高层次的hadoop执行框架

高层次的hadoop执行框架

下面介绍MapReduce执行管道线的主要组件。

★? Driver:这是主程序,用来初始化MapReduce job。它定义了job的个性化配置,并且标注了所有的组件(包括输入输出格式,mapper与reducer,使用结合器,使用定制的分片器等等)。Driver也可以获得job执行的状态。

★? Context:driver、mapper与reducer在不同的阶段被执行,一般情况下是在多台节点上执行。context对象在MapReduce执行的任何阶段都可以被使用。它为交换需要的系统与job内部信息提供一种方便的机制。要注意context协调只发生在MapReduce job 开始后合适的阶段(driver,map或者reduce)。这意味着在一个mapper中设置的值不可以在另一个mapper中使用(即使另一个mapper在第一个mapper完成后开始),但是在任何reducer中都是有效的。

★? Input Data:为MapReduce任务准备的最初存储数据。这些数据可以在HDFS,HBase,或者其他的仓库中。一般情况下,input data 是非常大的,几十个G或者更多。

InputFormat:如何对输入数据进行读取和切分

★ ?InputFormat类确定input data中数据输入哪个任务的InputSplit,并且提供一个生成RecordReader的工厂方法,这个对象主要是读取inputSplit指定的文件。Hadoop提供了一些InputFormat类。InputFormat直接被job的driver调用来决定map任务执行的数目与地点(根据InputSplit)。

★? InputSplit:InputSplit确定一个在MapReduce中map任务的作业单元。处理一个数据集的MapReduce程序由几个(也可能是几百个)map任务组成。InputFormat(直接被job driver调用)确定在map阶段中map任务的数目。每个map任务操作一个单独的InputSplit。完成InputSplits的计算后,MapReduce框架会再合适的节点启动期望数目的map任务。

★? RecorReader:InputSplit确定map任务的工作机,但没有描述如何获得该数据。RecordReader类是真正从数据源读取数据的类(在map 任务中),并将数据转化为设和map执行的key/value对,并将他们传递给map方法。RecordReader由InputFormat定义。

★? Mapper:mapper负责在MapReduce程序中第一个阶段用户自定义作业的执行。从实现的角度看,mapper实现负责将输入数据转化成一些列的key/value对(k1,v1),这些键值对将被用于单个map的执行。一般情况下mapp会将输入的键值对转化为另一种输出键值对(k2,v2),这些输出键值对将会作为reduce阶段shuffle与sort阶段的输入。一个新的mapper实例在每个map任务的单独的JVM实体中被实例化, 这些map任务构成所有作业输出的一部分。独立的mapper是不会提供任何与其他mapper通信的机制。这一点保证每个map任务的可靠性仅仅由本地节点的可靠性决定。

★? Partition:由所有独立的mapper产生的中间数据(k2,v2)的子集会被分配到一个reducer上执行。这些子集(或者partitions)会作为reduce任务的输入。具备相同键的数值会被一个reduce处理,而不会考虑他们有哪个mapper产生。这样的结果是,所有的map节点必须判断产生的中间数据将有哪个reducer执行。Partitioner类决定特定的key/value对将由哪个reducer执行。默认的Partitioner会为每个key计算一个哈希值,并根据这个值作为分配的依据。

★? Shuffle:在Hadoop集群中,每个节点可能会执行某个job的几个map任务。一旦至少有一个map函数执行完成,产生的中间输出就会根据key值进行分片,并将由map产生的的分片分发至需要它们的reducer。将map的输出传递到reducer的过程叫做shuffling。

★? Sort:每个reduce任务负责处理与部分key值相对应的value。中间key/value数据集,在被传递给reducer前会由Hadoop框架自动排序,组装成(k2,{v2,v2,…})的形式。

★? Reducer:reducer负责执行由用户提供的用于完成某个作业第二阶段任务的代码。对于分配到某个reducer中的每个key,reducer的reduce()方法都会被调用一次。这个方法接收一个key值,由迭代器遍历与它绑定在一起的所有value值,并无序的返回与这个key值相关的value值。一般情况下,reducer将输入的key/value转化成输出键值对(k3,v3)。

★? OutputFormat:job的输出(job的输出可以由reducer产生,若没有reducer也可由map产生)记录的方式有OutputFormat控制。OutputFormat负责确定输出数据的地址,由RecordWriter负责将数据结果写入。

★? RecordWriter:RecordWriter定义每条output记录如何写入。

下面将介绍MapReduce执行时两个可选的组件

★? Combiner:这是一个可以优化MapReduce job执行的可选执行步骤。如何选择后,combiner运行在mapper执行后,reduce执行前。Combiner的实例会运行在每个map任务中与部分reduce任务中。Combiner接收由mapper实例输出的所有数据作为输入,并且尝试将具有相同key值的value整合,以此来减少key值的存储空间和减少必须存储的(实际上不必须)key值的数目。Combiner的输出会被排序并发送给reducer。

★? Distribute cache:另一个常用与MapReduce job中的工具是distribute cache。这个组件可以使得集群中所有节点共享数据。Distribute cache可以是能够被所有任务都能可获得的共享库,包含key/value对的全局查找文件,jar文件(或者是archives)包含可执行代码等等。该工具会将这些文件拷贝至实际执行任务的节点,并使它们可以在本地使用。

三、MapReduce编程模型

MapReduce编程模型主要包括Mapper和Reducer两个内部类和主方法。

下面直接看代码:

package com.sven.mrlearn;

import java.io.IOException;

import java.util.Iterator;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool {

public static class Map extends

Mapper {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

@Override

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, one);

}

}

}

public static class Reduce extends

Reducer {

@Override

public void reduce(Text key, Iterable val, Context context)

throws IOException, InterruptedException {

int sum = 0;

Iterator values = val.iterator();

while (values.hasNext()) {

sum += values.next().get();

}

context.write(key, new IntWritable(sum));

}

}

public int run(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = new Job(conf, “Word Count”);

job.setJarByClass(WordCount.class);

// Set up the input

job.setInputFormatClass(TextInputFormat.class);

TextInputFormat.addInputPath(job, new Path(args[0]));

// Mapper

job.setMapperClass(Map.class);

// Reducer

job.setReducerClass(Reduce.class);

// Output

job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

TextOutputFormat.setOutputPath(job, new Path(args[1]));

// Execute

boolean res = job.waitForCompletion(true);

if (res)

return 0;

else

return -1;

}

public static void main(String[] args) throws Exception {

int res = ToolRunner.run(new WordCount(), args);

System.exit(res);

}

}

Mapper静态内部类

mapper包含三个主要的方法:setup, cleanup, and map。其中map是我们必须手动实现的。

setup和cleanup在一个指定的mapper周期内只执行一次。因此我们可以再这里做一些任务初始化的工作,如在setup打开共享文件,打开hbase等数据库连接。

同理cleanup用于清理任务,释放资源。

map是这中做繁忙的方法,它不断接受接受键值对,不断处理键值对和通过context写出结果键值对。值得注意的是map并不直接读取记录,而是由reader(该组件可重写)读取

然后通过context传递给map。

那么map是怎么执行的呢?

我们打开Mapper类,有这么一个run方法:

/**

* Expert users can override this method for more complete control over the

* execution of the Mapper.

* @param context

* @throws IOException

*/

public void run(Context context) throws IOException, InterruptedException {

setup(context);

while (context.nextKeyValue()) {

map(context.getCurrentKey(), context.getCurrentValue(), context);

}

cleanup(context);

}

Reducer静态内部类

和Mapper类一样,Reducer类也有setup, cleanup, reduce,和一个run方法。

setup, cleanup, reduce方法和Mapper里面的setup, cleanup, map类似。唯一不同的是reduce接受的是一个key对应一个值的集合的迭代器。

(remember, a reducer is invoked ?after execution of shuffle and sort, at which point, all the input key/value pairs are sorted, and

all the values for the same key are partitioned to a single reducer and come together)

总结

这里是MapReduce的原理及编程模型的概要介绍。后面章节我们将介绍《MR总结-Mapreduce原理解析(一)》《MR总结-Mapreduce原理解析(一)》《MR总结-Mapreduce原理解析(一)》

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn