집 >데이터 베이스 >MySQL 튜토리얼 >Spark DataFrame에서 각 그룹의 첫 번째 행을 효율적으로 선택하는 방법은 무엇입니까?
Hour, Category 및 TotalValue 열이 포함된 DataFrame이 있는 경우 작업은 각 그룹화의 첫 번째 행을 선택하는 것입니다. 여기서 각 그룹화는 Hour 및 Category의 고유한 조합으로 정의됩니다.
<code>import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc) val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")</code>
<code>val dfMax = df.groupBy($"Hour").agg(max($"TotalValue").as("max_value")) val dfTopByJoin = df.join(dfMax, ($"Hour" === dfMax("Hour")) && ($"TotalValue" === dfMax("max_value"))).drop(dfMax("Hour")).drop(dfMax("max_value"))</code>
<code>val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs")) .groupBy($"Hour") .agg(max("vs").alias("vs")) .select($"Hour", $"vs.Category", $"vs.TotalValue")</code>
<code>import org.apache.spark.sql.Encoder import org.apache.spark.sql.expressions.Aggregator case class Record(Hour: Integer, Category: String, TotalValue: Double) def firstOfHour[T: Encoder : Aggregator]: TypedColumn[Record, Record] = { aggregator[Record, (Option[Record], Long)](Record(Hour = 0, Category = null, TotalValue = 0.0)) { (buffer, record) => if (record.Hour > buffer._2) buffer else (Some(record), record.Hour) } { (buffer1, buffer2) => if (buffer1._2 > buffer2._2) buffer1 else buffer2 } { x => x._1 match { case Some(r) => r case _ => Record(Hour = 0, Category = "0", TotalValue = 0.0) } } } df.as[Record].groupByKey(_.Hour).agg(firstOfHour[Record]).show</code>
위 내용은 Spark DataFrame에서 각 그룹의 첫 번째 행을 효율적으로 선택하는 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!