ホームページ >データベース >mysql チュートリアル >Spark DataFrame の各グループから最初の行を選択するにはどうすればよいですか?
グループ化された DataFrame の最初の行選択
Spark で複雑なデータセットを操作する場合、多くの場合、特定の基準に基づいて各グループから特定の行を選択する必要があります。一般的なシナリオは、各グループから最初の行を選択し、特定の列で並べ替えることです。
DataFrame の各グループから最初の行を選択するには、いくつかのメソッドを使用できます:
ウィンドウ関数:
<code>import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window // 创建一个带有分组数据的 DataFrame val df = sc.parallelize(Seq((0, "cat26", 30.9), (0, "cat13", 22.1), (0, "cat95", 19.6), (0, "cat105", 1.3), (1, "cat67", 28.5), (1, "cat4", 26.8), (1, "cat13", 12.6), (1, "cat23", 5.3), (2, "cat56", 39.6), (2, "cat40", 29.7), (2, "cat187", 27.9), (2, "cat68", 9.8), (3, "cat8", 35.6))).toDF("Hour", "Category", "TotalValue") // 创建窗口规范 val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc) // 计算每个组的行号 val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn") // 显示每个组的第一行 dfTop.show</code>
単純な SQL 集計と結合:
<code>val dfMax = df.groupBy($"Hour".as("max_hour")).agg(max($"TotalValue").as("max_value")) val dfTopByJoin = df.join(broadcast(dfMax), ($"Hour" === $"max_hour") && ($"TotalValue" === $"max_value")) .drop("max_hour") .drop("max_value") dfTopByJoin.show</code>
構造ソート:
<code>val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs")) .groupBy($"Hour") .agg(max("vs").alias("vs")) .select($"Hour", $"vs.Category", $"vs.TotalValue") dfTop.show</code>
データセット API:
スパーク 1.6:
<code>case class Record(Hour: Integer, Category: String, TotalValue: Double) df.as[Record] .groupBy($"Hour") .reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y) .show</code>
Spark 2.0 以降:
<code>df.as[Record] .groupByKey(_.Hour) .reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)</code>
これらのメソッドは、指定された並べ替え基準に基づいて各グループから最初の行を選択する複数の方法を提供します。方法の選択は、特定のニーズとパフォーマンスの考慮事項によって異なります。
以上がSpark DataFrame の各グループから最初の行を選択するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。