
Home  >  Q&A  >  body text

mongodb - spark集群中每个节点都有一个独立数据库,可以实现分布式统计计算吗?


val conf = new SparkConf().setAppName("Scala Word Count")
val sc = new SparkContext(conf)

val config = new Configuration()

config.set("mongo.input.uri", "mongodb://")
config.set("mongo.output.uri", "mongodb://")

val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])

// Input contains tuples of (ObjectId, BSONObject)
val countsRDD = mongoRDD.flatMap(arg => {
  var str = arg._2.get("type").toString
  str = str.toLowerCase().replaceAll("[.,!?\n]", " ")
  str.split(" ")
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)

// Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null
val saveRDD = => {
  var bson = new BasicBSONObject()
  bson.put("word", tuple._1)
  bson.put("count", tuple._2.toString() )
  (null, bson)

// Only MongoOutputFormat and config are relevant
saveRDD.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)
天蓬老师天蓬老师2862 days ago756

reply all(1)I'll reply

  • PHP中文网

    PHP中文网2017-04-28 09:06:12

    Ask and answer your own questions. The reason may be this:

    val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])

    This line of code means that the driver reads the database, and then loads the qualified data into the RDD. Since the previous setting was to use as the input, that is, the data is read from the driver's mongodb. Since the driver is on the master, the data read is naturally the data on the master.

  • Cancelreply