分組 DataFrame 中的首行選擇
在處理 Spark 中的複雜資料集時,通常需要根據特定條件從每個群組中選擇特定行。常見的情況是從每個群組中選擇第一行,並按特定列排序。
為了從 DataFrame 的每個群組中選擇第一行,可以使用幾種方法:
視窗函數:
<code>import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window // 创建一个带有分组数据的 DataFrame val df = sc.parallelize(Seq((0, "cat26", 30.9), (0, "cat13", 22.1), (0, "cat95", 19.6), (0, "cat105", 1.3), (1, "cat67", 28.5), (1, "cat4", 26.8), (1, "cat13", 12.6), (1, "cat23", 5.3), (2, "cat56", 39.6), (2, "cat40", 29.7), (2, "cat187", 27.9), (2, "cat68", 9.8), (3, "cat8", 35.6))).toDF("Hour", "Category", "TotalValue") // 创建窗口规范 val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc) // 计算每个组的行号 val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn") // 显示每个组的第一行 dfTop.show</code>
簡單的 SQL 聚合與連接:
<code>val dfMax = df.groupBy($"Hour".as("max_hour")).agg(max($"TotalValue").as("max_value")) val dfTopByJoin = df.join(broadcast(dfMax), ($"Hour" === $"max_hour") && ($"TotalValue" === $"max_value")) .drop("max_hour") .drop("max_value") dfTopByJoin.show</code>
結構體排序:
<code>val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs")) .groupBy($"Hour") .agg(max("vs").alias("vs")) .select($"Hour", $"vs.Category", $"vs.TotalValue") dfTop.show</code>
DataSet API:
Spark 1.6:
<code>case class Record(Hour: Integer, Category: String, TotalValue: Double) df.as[Record] .groupBy($"Hour") .reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y) .show</code>
Spark 2.0 或更高版本:
<code>df.as[Record] .groupByKey(_.Hour) .reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)</code>
這些方法提供了多種根據指定的排序條件從每個群組中選擇第一行的方法。方法的選擇取決於具體的需要和性能考慮。
以上是如何從 Spark DataFrame 中的每個群組中選擇第一行?的詳細內容。更多資訊請關注PHP中文網其他相關文章!