トリガーは、ウィンドウ (ウィンドウ アロケーターによって形成された) がウィンドウ関数によって処理される準備が整ったときを決定します。各 WindowAssigner にはデフォルト値の Trigger が付属しています。デフォルトのトリガーがニーズを満たさない場合は、trigger(…) を使用できます。
public abstract class Trigger<T, W extends Window> implements Serializable { /** 只要有元素落⼊到当前窗⼝, 就会调⽤该⽅法 * @param element 收到的元素 * @param timestamp 元素抵达时间. * @param window 元素所属的window窗口. * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调. */ public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception; /** * processing-time 定时器回调函数 * * @param time 定时器触发的时间. * @param window 定时器触发的窗口对象. * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调. */ public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception; /** * event-time 定时器回调函数 * * @param time 定时器触发的时间. * @param window 定时器触发的窗口对象. * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调. */ public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception; /** * 当 多个窗口合并到⼀个窗⼝的时候,调用该方法法,例如系统SessionWindow * * @param window 合并后的新窗口对象 * @param ctx ⼀个上下⽂对象,通常用该对象注册 timer(ProcessingTime/EventTime)回调以及访问状态 */ public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException("This trigger does not support merging."); } /** * 当窗口被删除后执⾏所需的任何操作。例如:可以清除定时器或者删除状态数据 */ public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception; }
public enum TriggerResult { // 表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。 CONTINUE(false, false), // 触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。 FIRE_AND_PURGE(true, true), // 触发窗口计算,但是保留窗口元素 FIRE(true, false), // 不触发窗口计算,丢弃窗口,并且删除窗口的元素。 PURGE(false, true); private final boolean fire; private final boolean purge; private TriggerResult(boolean fire, boolean purge) { this.purge = purge; this.fire = fire; } public boolean isFire() { return this.fire; } public boolean isPurge() { return this.purge; } }
ウィンドウが処理の準備ができているとトリガーが判断すると、トリガーが起動され、戻りステータスは FIRE またはFIRE_AND_PURGE。このうち、FIRE はウィンドウ計算をトリガーしてウィンドウの内容を保持し、FIRE_AND_PURGE はウィンドウ計算をトリガーしてウィンドウの内容を削除します。デフォルトでは、事前実装されたトリガーはウィンドウ状態をクリアせずに単に FIRE します。
EventTimeTrigger: EventTime とウィンドウの Endtime を比較して、ウィンドウ計算をトリガーするかどうかを決定します。EventTime が Window EndTime より大きい場合、トリガーされます。そうでない場合はトリガーされず、ウィンドウは待機し続けます。
ProcessTimeTrigger: ProcessTime とウィンドウ EndTme を比較して、ウィンドウをトリガーするかどうかを決定します。ProcessTime が EndTime より大きい場合、計算がトリガーされ、それ以外の場合、ウィンドウは待機し続けます。
ContinuousEventTimeTrigger: 間隔周期トリガー ウィンドウ、またはウィンドウの終了時間が現在の EndTime トリガー ウィンドウより小さいことに基づいて計算されます。
ContinuousProcessingTimeTrigger: 間隔周期トリガー ウィンドウに基づいて計算されるか、ウィンドウの終了時間が現在の ProcessTime トリガー ウィンドウよりも短いです。
CountTrigger: アクセスされたデータの量が設定されたしきい値を超えるかどうかに基づいて、ウィンドウ計算をトリガーするかどうかを決定します。
DeltaTrigger: アクセス データから計算されたデルタ インジケーターが指定されたしきい値を超えるかどうかに基づいて、ウィンドウ計算をトリガーするかどうかを決定します。
PurgingTrigger: 任意のトリガーをパラメーターとしてパージ タイプのトリガーに変換でき、計算の完了後にデータがクリーンアップされます。
NeverTrigger: いつでもウィンドウ計算をトリガーしない
主に次のソース コードを確認します。 EventTimeTrigger と ProcessingTimeTrigger 。
public class EventTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private EventTimeTrigger() { } public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } public boolean canMerge() { return true; } public void onMerge(TimeWindow window, OnMergeContext ctx) { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } public String toString() { return "EventTimeTrigger()"; } public static EventTimeTrigger create() { return new EventTimeTrigger(); } }
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private ProcessingTimeTrigger() { } public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE; } public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); } public boolean canMerge() { return true; } public void onMerge(TimeWindow window, OnMergeContext ctx) { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } public String toString() { return "ProcessingTimeTrigger()"; } public static ProcessingTimeTrigger create() { return new ProcessingTimeTrigger(); } }
onElement() メソッドでは、ctx.registerProcessingTimeTimer(window.maxTimestamp()) が ProcessingTime タイマー、時間を登録します。パラメータは window.maxTimestamp() で、これはウィンドウの最終時刻です。時間がウィンドウの最終時刻に達すると、タイマーがトリガーされ、onProcessingTime() メソッドが呼び出されます。onProcessingTime() メソッドでは、TriggerResult を返します。 FIRE は FIRE を返し、ウィンドウをトリガーします。ウィンドウ内のデータが計算されますが、ウィンドウ要素は保持されます。
ProcessingTimeTrigger クラスは、ウィンドウの最終時刻に達したときにのみウィンドウ関数の計算をトリガーすることに注意してください。計算が完了した後、ウィンドウ内のデータはクリアされません。 PURGE または FIRE_AND_PURGE が呼び出されない限り、データはメモリに保存されます。それ以外の場合、データは常にメモリ内にあります。実際、Flink で提供される Trigger クラスは、PurgingTrigger クラスを除き、ウィンドウ内のデータをクリアしません。
TumblingEventTimeWindows :EventTimeTrigger public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } }
TumbleProcessingTimeWindows:ProcessingTimeTrigger
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } }
SlidingEventTimeWindows:EventTimeTrigger public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } }
SlidingProcessingTimeWindows:ProcessingTimeTrigger
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } }
EventTimeSessionWindows:EventTimeTrigger public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } }
ProcessingTimeSessionWindows: ProcessingTimeTrigger
public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } }
GlobalWindows :NeverTrigger public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return new GlobalWindows.NeverTrigger(); } }
以上がJava Flinkウィンドウトリガーの使い方 トリガーの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。