Spark SQL視窗函數及複雜條件
假設您有一個包含使用者登入詳細資訊的DataFrame,並且您想要新增一列來指示他們在網站上的啟用日期。但是,有一個需要注意的地方:使用者的活動期限在一段時間後到期,再次登入會重置他們的啟動日期。
這個問題可以使用Spark SQL中的視窗函數來解決。以下是一種方法:
步驟1:定義視窗
<code>import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ val userWindow = Window.partitionBy("user_name").orderBy("login_date") val userSessionWindow = Window.partitionBy("user_name", "session")</code>
步驟2:偵測新會話的開始
<code>val newSession = (coalesce( datediff($"login_date", lag($"login_date", 1).over(userWindow)), lit(0) ) > 5).cast("bigint") val sessionized = df.withColumn("session", sum(newSession).over(userWindow))</code>
步驟3:找出每個會話的最早日期
<code>val result = sessionized .withColumn("became_active", min($"login_date").over(userSessionWindow)) .drop("session")</code>
此方法使用滑動視窗按使用者對資料進行分區,並按登入日期進行排序。然後透過對具有相同會話ID的行進行分組來定義會話視窗。透過偵測新會話何時開始(newSession)並計算每個會話中最早的登入日期(became_active),可以實現所需的結果。
最新的Spark改進
對於Spark 3.2及更高版本,原生支援會話窗口,使上述解決方案更加簡單。有關詳細信息,請參閱官方文檔。
以上是Spark SQL 視窗函數如何透過基於會話的過期來決定使用者啟動日期?的詳細內容。更多資訊請關注PHP中文網其他相關文章!