Rumah > Soal Jawab > teks badan
我将spark搭建在两台机器上,其中一台即是master又是slave,另一台是slave,两台机器上均装有独立的mongodb数据库。我的主程序让它们统计自身数据库的内容,然后将结果汇总到一台服务器上的数据库里。目前代码是在master节点上提交的。但是我spark-submit之后,好像只统计master节点上的mongodb里的数据了,另一个worker节点没有统计上。请问这是什么原因?代码如下:
val conf = new SparkConf().setAppName("Scala Word Count")
val sc = new SparkContext(conf)
val config = new Configuration()
//以下代码表示只统计本机数据库上的数据,猜测问题可能出在这里
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/local.test")
//统计结果输出到服务器上
config.set("mongo.output.uri", "mongodb://103.25.23.80:60013/test_hao.result")
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
// Input contains tuples of (ObjectId, BSONObject)
val countsRDD = mongoRDD.flatMap(arg => {
var str = arg._2.get("type").toString
str = str.toLowerCase().replaceAll("[.,!?\n]", " ")
str.split(" ")
})
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
// Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null
val saveRDD = countsRDD.map((tuple) => {
var bson = new BasicBSONObject()
bson.put("word", tuple._1)
bson.put("count", tuple._2.toString() )
(null, bson)
})
// Only MongoOutputFormat and config are relevant
saveRDD.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)
PHP中文网2017-04-28 09:06:12
Tanya dan jawab soalan anda sendiri. Sebabnya mungkin ini:
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
Barisan kod ini menunjukkan bahawa pemandu membaca pangkalan data dan kemudian memuatkan data yang layak ke dalam RDD Memandangkan tetapan sebelumnya adalah menggunakan 127.0.0.1 sebagai input, iaitu, data dibaca daripada mongodb pemandu. Memandangkan pemandu berada pada induk, data yang dibaca secara semula jadi adalah data pada induk.