Maison >base de données >tutoriel mysql >Comment sélectionner efficacement la première ligne de chaque groupe avec la valeur la plus élevée dans un Spark DataFrame ?

Comment sélectionner efficacement la première ligne de chaque groupe avec la valeur la plus élevée dans un Spark DataFrame ?

Susan Sarandon
Susan Sarandonoriginal
2025-01-23 13:02:16707parcourir

How to Efficiently Select the First Row of Each Group with the Highest Value in a Spark DataFrame?

Comment choisir la page d'accueil de chaque groupe

Le but est d'extraire la première page avec la valeur la plus élevée de chaque groupe du DataFrame. C'est souvent le cas lors de l'analyse des tendances des données ou de l'identification des meilleurs acteurs dans une catégorie spécifique. Pour y parvenir, plusieurs méthodes peuvent être utilisées :

Fonction fenêtre :

Les fonctions de fenêtre permettent d'effectuer des calculs au sein d'un groupe. Dans ce cas, nous pouvons utiliser la fonction row_number() pour attribuer un numéro de séquence à chaque ligne en fonction de l'ordre spécifié. La page d'accueil peut ensuite être identifiée en filtrant le classement à 1.

<code class="language-scala">import org.apache.spark.sql.functions.{row_number, max, broadcast}
import org.apache.spark.sql.expressions.Window

val df = sc.parallelize(Seq(
  (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
  (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
  (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
  (3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue")

val w = Window.partitionBy($"hour").orderBy($"TotalValue".desc)

val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")

dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// |   0|   cat26|      30.9|
// |   1|   cat67|      28.5|
// |   2|   cat56|      39.6|
// |   3|    cat8|      35.6|
// +----+--------+----------+</code>

Jointure simple post-agrégation SQL :

Alternativement, nous pouvons utiliser SQL pour effectuer l'agrégation, puis concaténer les résultats avec le DataFrame d'origine pour extraire la première page de chaque groupe.

<code class="language-scala">val dfMax = df.groupBy($"hour".as("max_hour")).agg(max($"TotalValue").as("max_value"))

val dfTopByJoin = df.join(broadcast(dfMax),
    ($"hour" === $"max_hour") && ($"TotalValue" === $"max_value"))
  .drop("max_hour")
  .drop("max_value")

dfTopByJoin.show

// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// |   0|   cat26|      30.9|
// |   1|   cat67|      28.5|
// |   2|   cat56|      39.6|
// |   3|    cat8|      35.6|
// +----+--------+----------+</code>

Utiliser le tri par structure :

Un moyen intéressant d'obtenir les mêmes résultats sans utiliser de fonctions de fenêtre ou de jointures consiste à trier les données en fonction d'une structure contenant des valeurs et des catégories. La valeur maximale de cette structure renverra alors la première page souhaitée pour chaque groupe.

<code class="language-scala">val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
  .groupBy($"hour")
  .agg(max("vs").alias("vs"))
  .select($"Hour", $"vs.Category", $"vs.TotalValue")

dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// |   0|   cat26|      30.9|
// |   1|   cat67|      28.5|
// |   2|   cat56|      39.6|
// |   3|    cat8|      35.6|
// +----+--------+----------+</code>

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn