Trigger bestimmt, wann das Fenster (das vom Fensterzuordner gebildet wird) für die Verarbeitung durch die Fensterfunktion bereit ist. Jeder WindowAssigner verfügt über einen Standardwert-Trigger. Wenn der Standardtrigger Ihren Anforderungen nicht entspricht, können Sie Trigger(…) verwenden.
public abstract class Trigger<T, W extends Window> implements Serializable { /** 只要有元素落⼊到当前窗⼝, 就会调⽤该⽅法 * @param element 收到的元素 * @param timestamp 元素抵达时间. * @param window 元素所属的window窗口. * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调. */ public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception; /** * processing-time 定时器回调函数 * * @param time 定时器触发的时间. * @param window 定时器触发的窗口对象. * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调. */ public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception; /** * event-time 定时器回调函数 * * @param time 定时器触发的时间. * @param window 定时器触发的窗口对象. * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调. */ public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception; /** * 当 多个窗口合并到⼀个窗⼝的时候,调用该方法法,例如系统SessionWindow * * @param window 合并后的新窗口对象 * @param ctx ⼀个上下⽂对象,通常用该对象注册 timer(ProcessingTime/EventTime)回调以及访问状态 */ public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException("This trigger does not support merging."); } /** * 当窗口被删除后执⾏所需的任何操作。例如:可以清除定时器或者删除状态数据 */ public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception; }
public enum TriggerResult { // 表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。 CONTINUE(false, false), // 触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。 FIRE_AND_PURGE(true, true), // 触发窗口计算,但是保留窗口元素 FIRE(true, false), // 不触发窗口计算,丢弃窗口,并且删除窗口的元素。 PURGE(false, true); private final boolean fire; private final boolean purge; private TriggerResult(boolean fire, boolean purge) { this.purge = purge; this.fire = fire; } public boolean isFire() { return this.fire; } public boolean isPurge() { return this.purge; } }
Sobald der Trigger feststellt, dass das Fenster zur Verarbeitung bereit ist, wird es ausgelöst und der Rückgabestatus kann FIRE oder FIRE_AND_PURGE sein. Unter anderem löst FIRE die Fensterberechnung aus und behält den Fensterinhalt bei, während FIRE_AND_PURGE die Fensterberechnung auslöst und den Fensterinhalt löscht. Standardmäßig FEUERN vorimplementierte Trigger einfach, ohne den Fensterstatus zu löschen.
EventTimeTrigger: Bestimmen Sie, ob die Fensterberechnung ausgelöst werden soll, indem Sie die EventTime und die Endtime des Fensters vergleichen. Andernfalls wird sie nicht ausgelöst Das Fenster wartet weiterhin.
ProcessTimeTrigger: Bestimmen Sie, ob das Fenster ausgelöst werden soll, indem Sie ProcessTime und Window EndTme vergleichen. Wenn ProcessTime größer als EndTime ist, wird die Berechnung ausgelöst, andernfalls wartet das Fenster weiter.
ContinuousEventTimeTrigger: Wird basierend auf dem periodischen Triggerfenster des Intervalls berechnet oder die Endzeit des Fensters ist kleiner als das aktuelle EndTime-Triggerfenster.
ContinuousProcessingTimeTrigger: Berechnet basierend auf dem Intervall des periodischen Triggerfensters oder der Endzeit des Fensters, die kleiner als das aktuelle ProcessTime-Triggerfenster ist.
CountTrigger: Bestimmen Sie, ob die Fensterberechnung basierend darauf ausgelöst werden soll, ob die Menge der abgerufenen Daten den festgelegten Schwellenwert überschreitet.
DeltaTrigger: Bestimmen Sie, ob die Fensterberechnung basierend darauf ausgelöst werden soll, ob der aus den Zugriffsdaten berechnete Delta-Indikator den angegebenen Schwellenwert überschreitet.
PurgingTrigger: Jeder Trigger kann als Parameter in einen Purge-Trigger umgewandelt werden, und die Daten werden nach Abschluss der Berechnung bereinigt.
NeverTrigger: Fensterberechnung zu keinem Zeitpunkt auslösen
Schauen Sie sich hauptsächlich den Quellcode von EventTimeTrigger und ProcessingTimeTrigger an.
public class EventTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private EventTimeTrigger() { } public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } public boolean canMerge() { return true; } public void onMerge(TimeWindow window, OnMergeContext ctx) { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } public String toString() { return "EventTimeTrigger()"; } public static EventTimeTrigger create() { return new EventTimeTrigger(); } }
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private ProcessingTimeTrigger() { } public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE; } public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); } public boolean canMerge() { return true; } public void onMerge(TimeWindow window, OnMergeContext ctx) { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } public String toString() { return "ProcessingTimeTrigger()"; } public static ProcessingTimeTrigger create() { return new ProcessingTimeTrigger(); } }
In der onElement()-Methode registriert ctx.registerProcessingTimeTimer(window.maxTimestamp()) einen ProcessingTime-Timer, und der Zeitparameter ist window.maxTimestamp() Die letzte Zeit des Fensters: Wenn die Zeit die letzte Zeit dieses Fensters erreicht, löst der Timer die Methode onProcessingTime() aus und ruft sie auf. In der Methode onProcessingTime() gibt return TriggerResult.FIRE FIRE zurück und löst die Berechnung der Daten im Fenster aus , jedoch unter Beibehaltung der Fensterelemente.
Es ist zu beachten, dass die ProcessingTimeTrigger-Klasse die Berechnung der Fensterfunktion erst auslöst, wenn die endgültige Zeit des Fensters erreicht ist. Nach Abschluss der Berechnung werden die Daten im Fenster nicht gelöscht es sei denn, PURGE oder FIRE_AND_PURGE wird aufgerufen. Die Daten bleiben immer im Speicher. Tatsächlich löscht keine der in Flink bereitgestellten Trigger-Klassen außer der PurgingTrigger-Klasse die Daten im Fenster.
TumblingEventTimeWindows :EventTimeTrigger public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } }
TumblingProcessingTimeWindows: ProcessingTimeTrigger
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } }
SlidingEventTimeWindows:EventTimeTrigger public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } }
SlidingProcessingTimeWindows: ProcessingTimeTrigger
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } }
EventTimeSessionWindows:EventTimeTrigger public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } }
ProcessingTimeSessionWindows: ProcessingTimeTrigger
public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> { public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } }
GlobalWindows :NeverTrigger public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return new GlobalWindows.NeverTrigger(); } }
Das obige ist der detaillierte Inhalt vonSo verwenden Sie den Java Flink-Fensterauslöser Trigger. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!