首頁 >資料庫 >mysql教程 >如何從 Spark DataFrame 中的每個群組中選擇第一行?

如何從 Spark DataFrame 中的每個群組中選擇第一行?

Barbara Streisand
Barbara Streisand原創
2025-01-23 13:12:14393瀏覽

How to Select the First Row from Each Group in a Spark DataFrame?

分組 DataFrame 中的首行選擇

在處理 Spark 中的複雜資料集時,通常需要根據特定條件從每個群組中選擇特定行。常見的情況是從每個群組中選擇第一行,並按特定列排序。

為了從 DataFrame 的每個群組中選擇第一行,可以使用幾種方法:

視窗函數:

<code>import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

// 创建一个带有分组数据的 DataFrame
val df = sc.parallelize(Seq((0, "cat26", 30.9), (0, "cat13", 22.1), (0, "cat95", 19.6), (0, "cat105", 1.3),
  (1, "cat67", 28.5), (1, "cat4", 26.8), (1, "cat13", 12.6), (1, "cat23", 5.3),
  (2, "cat56", 39.6), (2, "cat40", 29.7), (2, "cat187", 27.9), (2, "cat68", 9.8),
  (3, "cat8", 35.6))).toDF("Hour", "Category", "TotalValue")

// 创建窗口规范
val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc)

// 计算每个组的行号
val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")

// 显示每个组的第一行
dfTop.show</code>

簡單的 SQL 聚合與連接:

<code>val dfMax = df.groupBy($"Hour".as("max_hour")).agg(max($"TotalValue").as("max_value"))

val dfTopByJoin = df.join(broadcast(dfMax), ($"Hour" === $"max_hour") && ($"TotalValue" === $"max_value"))
  .drop("max_hour")
  .drop("max_value")

dfTopByJoin.show</code>

結構體排序:

<code>val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
  .groupBy($"Hour")
  .agg(max("vs").alias("vs"))
  .select($"Hour", $"vs.Category", $"vs.TotalValue")

dfTop.show</code>

DataSet API:

Spark 1.6:

<code>case class Record(Hour: Integer, Category: String, TotalValue: Double)

df.as[Record]
  .groupBy($"Hour")
  .reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y)
  .show</code>

Spark 2.0 或更高版本:

<code>df.as[Record]
  .groupByKey(_.Hour)
  .reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)</code>

這些方法提供了多種根據指定的排序條件從每個群組中選擇第一行的方法。方法的選擇取決於具體的需要和性能考慮。

以上是如何從 Spark DataFrame 中的每個群組中選擇第一行?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn