val conf = new SparkConf().setAppName("Scala Word Count")
val sc = new SparkContext(conf)
val config = new Configuration()
config.set("mongo.input.uri", "mongodb://")
config.set("mongo.output.uri", "mongodb://")
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
// Input contains tuples of (ObjectId, BSONObject)
val countsRDD = mongoRDD.flatMap(arg => {
var str = arg._2.get("type").toString
str = str.toLowerCase().replaceAll("[.,!?\n]", " ")
str.split(" ")
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
// Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null
val saveRDD = countsRDD.map((tuple) => {
var bson = new BasicBSONObject()
bson.put("word", tuple._1)
bson.put("count", tuple._2.toString() )
(null, bson)
// Only MongoOutputFormat and config are relevant
saveRDD.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)
PHP中文网2017-04-28 09:06:12
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])