Rumah >pangkalan data >tutorial mysql >Bagaimana untuk Memilih Baris Pertama Setiap Kumpulan dalam Spark DataFrame?
Pilih baris pertama setiap kumpulan
Untuk mendapatkan semula baris pertama setiap kumpulan berdasarkan kriteria pengisihan tertentu, anda boleh menggunakan beberapa kaedah:
Fungsi tetingkap
<code class="language-scala">import org.apache.spark.sql.functions.{row_number, max, broadcast} import org.apache.spark.sql.expressions.Window // 创建一个用于分区和排序的窗口对象 val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc) // 添加一个排名列来标识每个分组的第一行 val dfTop = df.withColumn("rn", row_number.over(w)) // 过滤排名为1的行 dfTop.where($"rn" === 1).drop("rn")</code>
Penggabungan dan gabungan SQL biasa
<code class="language-scala">// 聚合以查找每个小时的最大值 val dfMax = df.groupBy($"Hour".as("max_hour")).agg(max($"TotalValue").as("max_value")) // 将原始DataFrame与聚合后的DataFrame连接 val dfTopByJoin = df.join(broadcast(dfMax), ($"Hour" === $"max_hour") && ($"TotalValue" === $"max_value")) // 删除不必要的列 dfTopByJoin.drop("max_hour").drop("max_value")</code>
Isih struktur
<code class="language-scala">// 为包含TotalValue和Category的结构体定义别名 val vs = struct($"TotalValue", $"Category").alias("vs") // 按Hour分组并查找每个分组的最大结构体 val dfTop = df.select($"Hour", vs).groupBy($"Hour").agg(max(vs).alias("vs")) // 从最大结构体中提取Category和TotalValue dfTop.select($"Hour", $"vs.Category", $"vs.TotalValue")</code>
Gunakan API DataFrame
<code class="language-scala">// 为DataFrame定义一个自定义类 case class Record(Hour: Integer, Category: String, TotalValue: Double) // 将DataFrame转换为自定义类 val dfRecords = df.as[Record] // 按Hour分组并减少以查找TotalValue最大的记录 val dfTopRecords = dfRecords.groupByKey(_.Hour).reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y) // 转换回DataFrame dfTopRecords.toDF</code>
Atas ialah kandungan terperinci Bagaimana untuk Memilih Baris Pertama Setiap Kumpulan dalam Spark DataFrame?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!