首页 >数据库 >mysql教程 >如何从 Spark DataFrame 中的每个组中选择第一行?

如何从 Spark DataFrame 中的每个组中选择第一行?

Barbara Streisand
Barbara Streisand原创
2025-01-23 13:12:14439浏览

How to Select the First Row from Each Group in a Spark DataFrame?

分组 DataFrame 中的首行选择

在处理 Spark 中的复杂数据集时,通常需要根据特定条件从每个组中选择特定行。一种常见的情况是从每个组中选择第一行,并按特定列排序。

为了从 DataFrame 的每个组中选择第一行,可以使用几种方法:

窗口函数:

<code>import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

// 创建一个带有分组数据的 DataFrame
val df = sc.parallelize(Seq((0, "cat26", 30.9), (0, "cat13", 22.1), (0, "cat95", 19.6), (0, "cat105", 1.3),
  (1, "cat67", 28.5), (1, "cat4", 26.8), (1, "cat13", 12.6), (1, "cat23", 5.3),
  (2, "cat56", 39.6), (2, "cat40", 29.7), (2, "cat187", 27.9), (2, "cat68", 9.8),
  (3, "cat8", 35.6))).toDF("Hour", "Category", "TotalValue")

// 创建窗口规范
val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc)

// 计算每个组的行号
val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")

// 显示每个组的第一行
dfTop.show</code>

简单的 SQL 聚合和连接:

<code>val dfMax = df.groupBy($"Hour".as("max_hour")).agg(max($"TotalValue").as("max_value"))

val dfTopByJoin = df.join(broadcast(dfMax), ($"Hour" === $"max_hour") && ($"TotalValue" === $"max_value"))
  .drop("max_hour")
  .drop("max_value")

dfTopByJoin.show</code>

结构体排序:

<code>val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
  .groupBy($"Hour")
  .agg(max("vs").alias("vs"))
  .select($"Hour", $"vs.Category", $"vs.TotalValue")

dfTop.show</code>

DataSet API:

Spark 1.6:

<code>case class Record(Hour: Integer, Category: String, TotalValue: Double)

df.as[Record]
  .groupBy($"Hour")
  .reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y)
  .show</code>

Spark 2.0 或更高版本:

<code>df.as[Record]
  .groupByKey(_.Hour)
  .reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)</code>

这些方法提供了多种根据指定的排序条件从每个组中选择第一行的方法。方法的选择取决于具体的需要和性能考虑。

以上是如何从 Spark DataFrame 中的每个组中选择第一行?的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn