此範例示範如何使用 Spark 視窗函數來決定使用者啟動狀態,考慮 5 天的活動期,該期在後續登入時重置。 我們獲得了一個用戶登入的 DataFrame,目的是添加一個列來顯示每個用戶何時變得活躍。
方法:利用視窗函數
我們的方法使用視窗函數來識別觸發活動狀態重置的登入事件。 將建立一個視窗來按使用者和日期排序登入。 滯後此視窗可以比較目前和先前的登入時間。
<code class="language-scala">import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ val window = Window.partitionBy("user_name").orderBy("login_date") val df2 = df.withColumn("previous_login", lag("login_date", 1).over(window))</code>
解決活動狀態重置
became_active
日期是透過檢查上次登入 ( previous_login
) 是否在 5 天的活躍期內來確定的。如果previous_login
為空(首次登入)或時間差(login_date
- previous_login
)小於5天,則became_active
設定為目前login_date
。否則,該過程將遞歸地繼續,直到滿足此條件。
Spark 實作(版本 >= 3.2)
Spark 3.2 及更高版本提供本機會話窗口支持,簡化了此任務(有關詳細信息,請參閱官方文檔)。
Spark 實作(舊版)
對於較舊的 Spark 版本,需要解決方法:
<code class="language-scala">val userWindow = Window.partitionBy("user_name").orderBy("login_date") val userSessionWindow = Window.partitionBy("user_name", "session")</code>
<code class="language-scala">val newSession = (coalesce( datediff($"login_date", lag($"login_date", 1).over(userWindow)), lit(0) ) > 5).cast("bigint")</code>
<code class="language-scala">val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) val result = sessionized .withColumn("became_active", min($"login_date").over(userSessionWindow)) .drop("session")</code>
範例輸出
以下輸出示範了使用範例資料集的結果:
<code>+----------------+----------+-------------+ | user_name|login_date|became_active| +----------------+----------+-------------+ | OprahWinfreyJr|2012-01-10| 2012-01-10| |SirChillingtonIV|2012-01-04| 2012-01-04| |SirChillingtonIV|2012-01-11| 2012-01-11| |SirChillingtonIV|2012-01-14| 2012-01-11| |SirChillingtonIV|2012-08-11| 2012-08-11| |Booooooo99900098|2012-01-04| 2012-01-04| |Booooooo99900098|2012-01-06| 2012-01-04| +----------------+----------+-------------+</code>
以上是視窗函數如何透過 5 天活動期重置來識別使用者啟動狀態?的詳細內容。更多資訊請關注PHP中文網其他相關文章!