ホームページ >データベース >mysql チュートリアル >Spark DataFrame の各グループの最初の行を選択するにはどうすればよいですか?
各グループの最初の行を選択
特定の並べ替え基準に基づいて各グループの最初の行を取得するには、いくつかの方法を使用できます。
ウィンドウ関数
<code class="language-scala">import org.apache.spark.sql.functions.{row_number, max, broadcast} import org.apache.spark.sql.expressions.Window // 创建一个用于分区和排序的窗口对象 val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc) // 添加一个排名列来标识每个分组的第一行 val dfTop = df.withColumn("rn", row_number.over(w)) // 过滤排名为1的行 dfTop.where($"rn" === 1).drop("rn")</code>
通常の SQL 集計と結合
<code class="language-scala">// 聚合以查找每个小时的最大值 val dfMax = df.groupBy($"Hour".as("max_hour")).agg(max($"TotalValue").as("max_value")) // 将原始DataFrame与聚合后的DataFrame连接 val dfTopByJoin = df.join(broadcast(dfMax), ($"Hour" === $"max_hour") && ($"TotalValue" === $"max_value")) // 删除不必要的列 dfTopByJoin.drop("max_hour").drop("max_value")</code>
構造ソート
<code class="language-scala">// 为包含TotalValue和Category的结构体定义别名 val vs = struct($"TotalValue", $"Category").alias("vs") // 按Hour分组并查找每个分组的最大结构体 val dfTop = df.select($"Hour", vs).groupBy($"Hour").agg(max(vs).alias("vs")) // 从最大结构体中提取Category和TotalValue dfTop.select($"Hour", $"vs.Category", $"vs.TotalValue")</code>
DataFrame API を使用する
<code class="language-scala">// 为DataFrame定义一个自定义类 case class Record(Hour: Integer, Category: String, TotalValue: Double) // 将DataFrame转换为自定义类 val dfRecords = df.as[Record] // 按Hour分组并减少以查找TotalValue最大的记录 val dfTopRecords = dfRecords.groupByKey(_.Hour).reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y) // 转换回DataFrame dfTopRecords.toDF</code>
以上がSpark DataFrame の各グループの最初の行を選択するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。