>  Q&A  >  본문

mongodb - spark集群中每个节点都有一个独立数据库,可以实现分布式统计计算吗?

我将spark搭建在两台机器上,其中一台即是master又是slave,另一台是slave,两台机器上均装有独立的mongodb数据库。我的主程序让它们统计自身数据库的内容,然后将结果汇总到一台服务器上的数据库里。目前代码是在master节点上提交的。但是我spark-submit之后,好像只统计master节点上的mongodb里的数据了,另一个worker节点没有统计上。请问这是什么原因?代码如下:

val conf = new SparkConf().setAppName("Scala Word Count")
val sc = new SparkContext(conf)

val config = new Configuration()

//以下代码表示只统计本机数据库上的数据,猜测问题可能出在这里
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/local.test")
//统计结果输出到服务器上
config.set("mongo.output.uri", "mongodb://103.25.23.80:60013/test_hao.result")

val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])

// Input contains tuples of (ObjectId, BSONObject)
val countsRDD = mongoRDD.flatMap(arg => {
  var str = arg._2.get("type").toString
  str = str.toLowerCase().replaceAll("[.,!?\n]", " ")
  str.split(" ")
})
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)

// Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null
val saveRDD = countsRDD.map((tuple) => {
  var bson = new BasicBSONObject()
  bson.put("word", tuple._1)
  bson.put("count", tuple._2.toString() )
  (null, bson)
})

// Only MongoOutputFormat and config are relevant
saveRDD.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)
天蓬老师天蓬老师2755일 전676

모든 응답(1)나는 대답할 것이다

  • PHP中文网

    PHP中文网2017-04-28 09:06:12

    직접 질문하고 답해 보세요. 그 이유는 다음과 같습니다.

    으아악

    이 코드 줄은 드라이버가 데이터베이스를 읽은 다음 정규화된 데이터를 RDD에 로드한다는 것을 나타냅니다. 이전 설정은 127.0.0.1을 입력으로 사용하는 것이었으므로, 즉 드라이버의 mongodb에서 데이터를 읽습니다. 드라이버가 마스터에 있기 때문에 읽혀지는 데이터는 당연히 마스터의 데이터입니다.

    회신하다
    0
  • 취소회신하다