Maison >base de données >tutoriel mysql >Comment sélectionner la première ligne de chaque groupe dans un Spark DataFrame ?

Comment sélectionner la première ligne de chaque groupe dans un Spark DataFrame ?

Barbara Streisand
Barbara Streisandoriginal
2025-01-23 13:12:14393parcourir

How to Select the First Row from Each Group in a Spark DataFrame?

Sélection de la première ligne dans un DataFrame groupé

Lorsque vous travaillez avec des ensembles de données complexes dans Spark, vous devez souvent sélectionner des lignes spécifiques de chaque groupe en fonction de critères spécifiques. Un scénario courant consiste à sélectionner la première ligne de chaque groupe et à la trier selon une colonne spécifique.

Afin de sélectionner la première ligne de chaque groupe du DataFrame, plusieurs méthodes peuvent être utilisées :

Fonction fenêtre :

<code>import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

// 创建一个带有分组数据的 DataFrame
val df = sc.parallelize(Seq((0, "cat26", 30.9), (0, "cat13", 22.1), (0, "cat95", 19.6), (0, "cat105", 1.3),
  (1, "cat67", 28.5), (1, "cat4", 26.8), (1, "cat13", 12.6), (1, "cat23", 5.3),
  (2, "cat56", 39.6), (2, "cat40", 29.7), (2, "cat187", 27.9), (2, "cat68", 9.8),
  (3, "cat8", 35.6))).toDF("Hour", "Category", "TotalValue")

// 创建窗口规范
val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc)

// 计算每个组的行号
val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")

// 显示每个组的第一行
dfTop.show</code>

Agrégations et jointures SQL simples :

<code>val dfMax = df.groupBy($"Hour".as("max_hour")).agg(max($"TotalValue").as("max_value"))

val dfTopByJoin = df.join(broadcast(dfMax), ($"Hour" === $"max_hour") && ($"TotalValue" === $"max_value"))
  .drop("max_hour")
  .drop("max_value")

dfTopByJoin.show</code>

Tri des structures :

<code>val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
  .groupBy($"Hour")
  .agg(max("vs").alias("vs"))
  .select($"Hour", $"vs.Category", $"vs.TotalValue")

dfTop.show</code>

API DataSet :

Étincelle 1.6 :

<code>case class Record(Hour: Integer, Category: String, TotalValue: Double)

df.as[Record]
  .groupBy($"Hour")
  .reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y)
  .show</code>

Spark 2.0 ou supérieur :

<code>df.as[Record]
  .groupByKey(_.Hour)
  .reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)</code>

Ces méthodes offrent plusieurs façons de sélectionner la première ligne de chaque groupe en fonction de critères de tri spécifiés. Le choix de la méthode dépend des besoins spécifiques et des considérations de performances.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn