ホームページ >データベース >mysql チュートリアル >Spark:一个高效的分布式计算系统
概述 什么是Spark Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写H
map
, filter
, flatMap
, sample
, groupByKey
, reduceByKey
, union
, join
, cogroup
, mapValues
, sort
,partionBy
等多种操作类型,Spark把这些操作称为Transformations。同时还提供Count
, collect
, reduce
, lookup
, save
等多种actions操作。 RDD的特点:
RDD的好处
RDD的存储与分区
RDD的内部表示
在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示:
RDD的存储级别
RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别:
val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
RDD定义了各种操作,不同类型的数据由不同的RDD类抽象表示,不同的操作也由RDD进行抽实现。
下面来看一从Hadoop文件系统生成RDD的方式,如:val file = spark.textFile("hdfs://...")
,file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码如下:
// SparkContext根据文件/目录及可选的分片数创建RDD, 这里我们可以看到Spark与Hadoop MapReduce很像 // 需要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。 def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) .map(pair => pair._2.toString) } // 根据Hadoop配置,及InputFormat等创建HadoopRDD new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
对RDD进行计算时,RDD从HDFS读取数据时与Hadoop MapReduce几乎一样的:
// 根据hadoop配置和分片从InputFormat中获取RecordReader进行数据的读取。 reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) val key: K = reader.createKey() val value: V = reader.createValue() //使用Hadoop MapReduce的RecordReader读取数据,每个Key、Value对以元组返回。 override def getNext() = { try { finished = !reader.next(key, value) } catch { case eof: EOFException => finished = true } (key, value) }
下面使用一个例子来示例说明Transformations与Actions在Spark的使用。
val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val rdd_A = sc.textFile(hdfs://.....) val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1)) val rdd_C = sc.textFile(hdfs://.....) val rdd_D = rdd_C.map(line => (line.substring(10), 1)) val rdd_E = rdd_D.reduceByKey((a, b) => a + b) val rdd_F = rdd_B.jion(rdd_E) rdd_F.saveAsSequenceFile(hdfs://....)
Spark对于资源管理与作业调度可以使用Standalone(独立模式),Apache Mesos及Hadoop YARN来实现。 Spark on Yarn在Spark0.6时引用,但真正可用是在现在的branch-0.8版本。Spark on Yarn遵循YARN的官方规范实现,得益于Spark天生支持多种Scheduler和Executor的良好设计,对YARN的支持也就非常容易,Spark on Yarn的大致框架图。
让Spark运行于YARN上与Hadoop共用集群资源可以提高资源利用率。
Spark使用Scala开发,默认使用Scala作为编程语言。编写Spark程序比编写Hadoop MapReduce程序要简单的多,SparK提供了Spark-Shell,可以在Spark-Shell测试程序。写SparK程序的一般步骤就是创建或使用(SparkContext)实例,使用SparkContext创建RDD,然后就是对RDD进行操作。如:
val sc = new SparkContext(master, appName, [sparkHome], [jars]) val textFile = sc.textFile("hdfs://.....") textFile.map(....).filter(.....).....
Spark支持Java编程,但对于使用Java就没有了Spark-Shell这样方便的工具,其它与Scala编程是一样的,因为都是JVM上的语言,Scala与Java可以互操作,Java编程接口其实就是对Scala的封装。如:
JavaSparkContext sc = new JavaSparkContext(...); JavaRDD lines = ctx.textFile("hdfs://..."); JavaRDD words = lines.flatMap( new FlatMapFunction() { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } } );
现在Spark也提供了Python编程接口,Spark使用py4j来实现python与java的互操作,从而实现使用python编写Spark程序。Spark也同样提供了pyspark,一个Spark的python shell,可以以交互式的方式使用Python编写Spark程序。 如:
from pyspark import SparkContext sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) words = sc.textFile("/usr/share/dict/words") words.filter(lambda w: w.startswith("spar")).take(5)
以Standalone模式运行Spark集群
http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz
)修改配置(conf/*) slaves: 配置工作节点的主机名 spark-env.sh:配置环境变量。
SCALA_HOME=/home/spark/scala-2.9.3 JAVA_HOME=/home/spark/jdk1.6.0_45 SPARK_MASTER_IP=spark1 SPARK_MASTER_PORT=30111 SPARK_MASTER_WEBUI_PORT=30118 SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=4g SPARK_WORKER_PORT=30333 SPARK_WORKER_WEBUI_PORT=30119 SPARK_WORKER_INSTANCES=1
把Hadoop配置copy到conf目录下
在master主机上对其它机器做ssh无密码登录
把配置好的Spark程序使用scp copy到其它机器
在master启动集群
$SPARK_HOME/start-all.sh
以Yarn模式运行Spark
下载Spark代码.
git clone git://github.com/mesos/spark
切换到branch-0.8
cd spark git checkout -b yarn --track origin/yarn
使用sbt编译Spark并
$SPARK_HOME/sbt/sbt > package > assembly
把Hadoop yarn配置copy到conf目录下
运行测试
SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-SNAPSHOT.jar \ ./run spark.deploy.yarn.Client --jar examples/target/scala-2.9.3/ \ --class spark.examples.SparkPi --args yarn-standalone
$SPARK_HOME/spark-shell
进入shell即可,在Spark-shell中SparkContext已经创建好了,实例名为sc可以直接使用,还有一个需要注意的是,在Standalone模式下,Spark默认使用的调度器的FIFO调度器而不是公平调度,而Spark-shell作为一个Spark程序一直运行在Spark上,其它的Spark程序就只能排队等待,也就是说同一时间只能有一个Spark-shell在运行。在Spark-shell上写程序非常简单,就像在Scala Shell上写程序一样。
scala> val textFile = sc.textFile("hdfs://hadoop1:2323/user/data") textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3 scala> textFile.count() // Number of items in this RDD res0: Long = 21374 scala> textFile.first() // First item in this RDD res1: String = # Spark
在Spark中Spark程序称为Driver程序,编写Driver程序很简单几乎与在Spark-shell上写程序是一样的,不同的地方就是SparkContext需要自己创建。如WorkCount程序如下:
import spark.SparkContext import SparkContext._ object WordCount { def main(args: Array[String]) { if (args.length ==0 ){ println("usage is org.test.WordCount ") } println("the args: ") args.foreach(println) val hdfsPath = "hdfs://hadoop1:8020" // create the SparkContext, args(0)由yarn传入appMaster地址 val sc = new SparkContext(args(0), "WrodCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val textFile = sc.textFile(hdfsPath + args(1)) val result = textFile.flatMap(line => line.split("\\s+")) .map(word => (word, 1)).reduceByKey(_ + _) result.saveAsTextFile(hdfsPath + args(2)) } }
原文地址:Spark:一个高效的分布式计算系统, 感谢原作者分享。