Maison  >  Article  >  Java  >  Explication détaillée de la sérialisation des objets et de leur stockage dans HDFS dans Spark

Explication détaillée de la sérialisation des objets et de leur stockage dans HDFS dans Spark

零下一度
零下一度original
2017-06-17 11:42:072243parcourir

Cet article présente principalement les informations pertinentes sur la façon de sérialiser des objets et de les stocker dans hdfs dans Spark en Java. Les amis dans le besoin peuvent se référer à

Spark dans Java Object. sérialisation et stockage vers HDFS

Résumé : Les applications Spark rencontrent souvent une telle exigence : les objets JAVA doivent être sérialisés et stockés dans HDFS, notamment à l'aide de calculs MLlib Certains des les modèles sont générés et stockés dans hdfs afin que les modèles puissent être utilisés à plusieurs reprises. L'exemple suivant montre la lecture de données depuis Hbase dans l'environnement Spark, la génération d'un modèle word2vec et son stockage dans hdfs.

Sans plus tarder, postons simplement le code spark1.4 + hbase0.98


import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConverters._
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.net.URI
import java.util.Date
import org.ansj.library.UserDefineLibrary
import org.ansj.splitWord.analysis.NlpAnalysis
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.PageFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import com.feheadline.fespark.db.Neo4jManager
import com.feheadline.fespark.util.Env
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import scala.math.log
import scala.io.Source

object Word2VecDemo {

 def convertScanToString(scan: Scan) = {
  val proto = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
 }

 def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setAppName("Word2Vec Demo")
  sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  sparkConf.set("spark.kryoserializer.buffer", "256m")
  sparkConf.set("spark.kryoserializer.buffer.max","2046m")
  sparkConf.set("spark.akka.frameSize", "500")
  sparkConf.set("spark.rpc.askTimeout", "30")
  

  val sc = new SparkContext(sparkConf)
  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")

  hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")

  val scan = new Scan()
  val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)
  
  val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")
  
  val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(
  "data".getBytes,
  "article".getBytes,
  CompareOp.EQUAL,
  comp
  )
  
  filterList.addFilter(articleFilter)
  filterList.addFilter(new PageFilter(100))
  
  scan.setFilter(filterList)
  scan.setCaching(50)
  scan.setCacheBlocks(false)
  hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))

  val crawledRDD = sc.newAPIHadoopRDD(
   hbaseConf,
   classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result]
  )
 
  val articlesRDD = crawledRDD.filter{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     content != null
   }
  }

  val wordsInDoc = articlesRDD.map{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq
     else Seq("")
   }
  }
  
  val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)
  
  val word2vec = new Word2Vec()
  val model = word2vec.fit(fitleredWordsInDoc)
  
  //---------------------------------------重点看这里-------------------------------------------------------------
  //将上面的模型存储到hdfs
  val hadoopConf = sc.hadoopConfiguration
  hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
  val fileSystem = FileSystem.get(hadoopConf)
  val path = new Path("/user/hadoop/data/mllib/word2vec-object")
  val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
  oos.writeObject(model)
  oos.close
  
  //这里示例另外一个程序直接从hdfs读取序列化对象使用模型
  val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
  val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  
  /*
  * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型
  * import java.io._
  * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
  * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
  * val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  * ois.close
  */
  //--------------------------------------------------------------------------------------------------------------
 }
}
.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn