分组 DataFrame 中的首行选择
在处理 Spark 中的复杂数据集时,通常需要根据特定条件从每个组中选择特定行。一种常见的情况是从每个组中选择第一行,并按特定列排序。
为了从 DataFrame 的每个组中选择第一行,可以使用几种方法:
窗口函数:
<code>import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window // 创建一个带有分组数据的 DataFrame val df = sc.parallelize(Seq((0, "cat26", 30.9), (0, "cat13", 22.1), (0, "cat95", 19.6), (0, "cat105", 1.3), (1, "cat67", 28.5), (1, "cat4", 26.8), (1, "cat13", 12.6), (1, "cat23", 5.3), (2, "cat56", 39.6), (2, "cat40", 29.7), (2, "cat187", 27.9), (2, "cat68", 9.8), (3, "cat8", 35.6))).toDF("Hour", "Category", "TotalValue") // 创建窗口规范 val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc) // 计算每个组的行号 val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn") // 显示每个组的第一行 dfTop.show</code>
简单的 SQL 聚合和连接:
<code>val dfMax = df.groupBy($"Hour".as("max_hour")).agg(max($"TotalValue").as("max_value")) val dfTopByJoin = df.join(broadcast(dfMax), ($"Hour" === $"max_hour") && ($"TotalValue" === $"max_value")) .drop("max_hour") .drop("max_value") dfTopByJoin.show</code>
结构体排序:
<code>val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs")) .groupBy($"Hour") .agg(max("vs").alias("vs")) .select($"Hour", $"vs.Category", $"vs.TotalValue") dfTop.show</code>
DataSet API:
Spark 1.6:
<code>case class Record(Hour: Integer, Category: String, TotalValue: Double) df.as[Record] .groupBy($"Hour") .reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y) .show</code>
Spark 2.0 或更高版本:
<code>df.as[Record] .groupByKey(_.Hour) .reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)</code>
这些方法提供了多种根据指定的排序条件从每个组中选择第一行的方法。方法的选择取决于具体的需要和性能考虑。
以上是如何从 Spark DataFrame 中的每个组中选择第一行?的详细内容。更多信息请关注PHP中文网其他相关文章!