Heim >Datenbank >MySQL-Tutorial >Wie wähle ich effizient die erste Zeile jeder Gruppe in einem Spark-DataFrame aus?

Wie wähle ich effizient die erste Zeile jeder Gruppe in einem Spark-DataFrame aus?

Susan Sarandon
Susan SarandonOriginal
2025-01-23 13:06:10354Durchsuche

How to Efficiently Select the First Row of Each Group in a Spark DataFrame?

Wie kann ich die erste Zeile jeder Gruppierung in Spark DataFrame effizient auswählen?

Bei einem gegebenen DataFrame mit den Spalten „Stunde“, „Kategorie“ und „Gesamtwert“ besteht die Aufgabe darin, die erste Zeile jeder Gruppierung auszuwählen, wobei jede Gruppierung durch eine eindeutige Kombination aus „Stunde“ und „Kategorie“ definiert wird.

Fensterfunktionen verwenden

<code>import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc)

val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")</code>

Verwenden Sie einfaches SQL, um nach der Aggregation beizutreten

<code>val dfMax = df.groupBy($"Hour").agg(max($"TotalValue").as("max_value"))

val dfTopByJoin = df.join(dfMax, ($"Hour" === dfMax("Hour")) && ($"TotalValue" === dfMax("max_value"))).drop(dfMax("Hour")).drop(dfMax("max_value"))</code>

Strukturelle Sortierung verwenden

<code>val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
  .groupBy($"Hour")
  .agg(max("vs").alias("vs"))
  .select($"Hour", $"vs.Category", $"vs.TotalValue")</code>

Verwenden Sie die DataSet-API (Spark 1.6, 2.0)

<code>import org.apache.spark.sql.Encoder
import org.apache.spark.sql.expressions.Aggregator

case class Record(Hour: Integer, Category: String, TotalValue: Double)

def firstOfHour[T: Encoder : Aggregator]: TypedColumn[Record, Record] = {
  aggregator[Record, (Option[Record], Long)](Record(Hour = 0, Category = null, TotalValue = 0.0)) { (buffer, record) =>
    if (record.Hour > buffer._2) buffer else (Some(record), record.Hour)
  } { (buffer1, buffer2) =>
    if (buffer1._2 > buffer2._2) buffer1 else buffer2
  } { x =>
    x._1 match {
      case Some(r) => r
      case _ => Record(Hour = 0, Category = "0", TotalValue = 0.0)
    }
  }
}

df.as[Record].groupByKey(_.Hour).agg(firstOfHour[Record]).show</code>

Das obige ist der detaillierte Inhalt vonWie wähle ich effizient die erste Zeile jeder Gruppe in einem Spark-DataFrame aus?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn