Maison >base de données >tutoriel mysql >Comment les fonctions de fenêtre peuvent-elles identifier l'état d'activation de l'utilisateur avec une réinitialisation de la période active de 5 jours ?
Cet exemple montre comment utiliser les fonctions de la fenêtre Spark pour déterminer l'état d'activation de l'utilisateur, en considérant une période active de 5 jours qui se réinitialise lors des connexions suivantes. Nous recevons un DataFrame de connexions utilisateur et visons à ajouter une colonne indiquant quand chaque utilisateur est devenu actif.
Méthodologie : exploiter les fonctions de fenêtre
Notre approche utilise des fonctions de fenêtre pour identifier les événements de connexion déclenchant une réinitialisation du statut actif. Une fenêtre est créée pour classer les connexions par utilisateur et par date. Le retard de cette fenêtre permet de comparer les heures de connexion actuelles et précédentes.
<code class="language-scala">import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ val window = Window.partitionBy("user_name").orderBy("login_date") val df2 = df.withColumn("previous_login", lag("login_date", 1).over(window))</code>
Résolution des réinitialisations d'état actif
La date became_active
est déterminée en vérifiant si la connexion précédente ( previous_login
) tombe dans la période active de 5 jours. Si previous_login
est nul (première connexion) ou si le décalage horaire (login_date
- previous_login
) est inférieur à 5 jours, became_active
est défini sur le login_date
actuel. Sinon, le processus continue de manière récursive jusqu'à ce que cette condition soit remplie.
Implémentation Spark (Versions >= 3.2)
Spark 3.2 et versions ultérieures offrent une prise en charge native des fenêtres de session, simplifiant cette tâche (voir la documentation officielle pour plus de détails).
Implémentation Spark (anciennes versions)
Pour les anciennes versions de Spark, une solution de contournement est nécessaire :
<code class="language-scala">val userWindow = Window.partitionBy("user_name").orderBy("login_date") val userSessionWindow = Window.partitionBy("user_name", "session")</code>
<code class="language-scala">val newSession = (coalesce( datediff($"login_date", lag($"login_date", 1).over(userWindow)), lit(0) ) > 5).cast("bigint")</code>
<code class="language-scala">val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) val result = sessionized .withColumn("became_active", min($"login_date").over(userSessionWindow)) .drop("session")</code>
Exemple de sortie
Le résultat suivant montre le résultat à l'aide d'un exemple d'ensemble de données :
<code>+----------------+----------+-------------+ | user_name|login_date|became_active| +----------------+----------+-------------+ | OprahWinfreyJr|2012-01-10| 2012-01-10| |SirChillingtonIV|2012-01-04| 2012-01-04| |SirChillingtonIV|2012-01-11| 2012-01-11| |SirChillingtonIV|2012-01-14| 2012-01-11| |SirChillingtonIV|2012-08-11| 2012-08-11| |Booooooo99900098|2012-01-04| 2012-01-04| |Booooooo99900098|2012-01-06| 2012-01-04| +----------------+----------+-------------+</code>
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!