Maison  >  Article  >  Java  >  Comment utiliser le déclencheur de fenêtre Java Flink Trigger

Comment utiliser le déclencheur de fenêtre Java Flink Trigger

王林
王林avant
2023-05-03 13:10:101594parcourir

Definition

Trigger détermine quand la fenêtre (formée par l'allocateur de fenêtre) est prête à être traitée par la fonction window. Chaque WindowAssigner est livré avec une valeur par défaut Trigger. Si le déclencheur par défaut ne répond pas à vos besoins, vous pouvez utiliser trigger(…).

Code source du déclencheur

public abstract class Trigger<T, W extends Window> implements Serializable {
	/**
	 只要有元素落⼊到当前窗⼝, 就会调⽤该⽅法
	 * @param element 收到的元素
	 * @param timestamp 元素抵达时间.
	 * @param window 元素所属的window窗口.
	 * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.
	 */
    public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception;
	
	 /**
	 * processing-time 定时器回调函数
	 *
	 * @param time 定时器触发的时间.
	 * @param window 定时器触发的窗口对象.
	 * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.
	 */
    public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

	/**
	 * event-time 定时器回调函数
	 *
	 * @param time 定时器触发的时间.
	 * @param window 定时器触发的窗口对象.
	 * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.
	 */
    public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

	 /**
	 * 当 多个窗口合并到⼀个窗⼝的时候,调用该方法法,例如系统SessionWindow
	 *
	 * @param window 合并后的新窗口对象
	 * @param ctx ⼀个上下⽂对象,通常用该对象注册 timer(ProcessingTime/EventTime)回调以及访问状态
	 */
    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }
	
	/**
	 * 当窗口被删除后执⾏所需的任何操作。例如:可以清除定时器或者删除状态数据
	 */
    public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception;
    }

Code source du déclencheurResult

public enum TriggerResult {
	// 表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。
    CONTINUE(false, false),
    // 触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。
    FIRE_AND_PURGE(true, true),
    // 触发窗口计算,但是保留窗口元素
    FIRE(true, false),
    // 不触发窗口计算,丢弃窗口,并且删除窗口的元素。
    PURGE(false, true);

    private final boolean fire;
    private final boolean purge;

    private TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return this.fire;
    }

    public boolean isPurge() {
        return this.purge;
    }
}

Une fois que le déclencheur détermine que la fenêtre est prête à être traitée, elle se déclenchera et le le statut de retour peut être FIRE ou FIRE_AND_PURGE. Parmi eux, FIRE déclenche le calcul de la fenêtre et conserve le contenu de la fenêtre, tandis que FIRE_AND_PURGE déclenche le calcul de la fenêtre et supprime le contenu de la fenêtre. Par défaut, les déclencheurs pré-implémentés déclenchent simplement FIRE sans effacer l'état de la fenêtre.

Flink preset Trigger

  • EventTimeTrigger : Déterminez s'il faut déclencher le calcul de la fenêtre en comparant EventTime avec l'heure de fin de la fenêtre si EventTime est supérieur à. Window EndTime, Trigger, sinon il ne se déclenchera pas et la fenêtre continuera à attendre.

  • ProcessTimeTrigger : Déterminez s'il faut déclencher la fenêtre en comparant ProcessTime et window EndTme. Si ProcessTime est supérieur à EndTime, le calcul est déclenché, sinon la fenêtre continue d'attendre.

  • ContinuousEventTimeTrigger : calculé en fonction de la fenêtre de déclenchement périodique de l'intervalle ou de l'heure de fin de la fenêtre est inférieure à la fenêtre de déclenchement EndTime actuelle.

  • ContinuousProcessingTimeTrigger : calculé en fonction de la fenêtre de déclenchement périodique de l'intervalle ou de l'heure de fin de la fenêtre est inférieure à la fenêtre de déclenchement ProcessTime actuelle.

  • CountTrigger : Déterminez s'il faut déclencher le calcul de la fenêtre en fonction du fait que la quantité de données accédées dépasse le seuil défini.

  • DeltaTrigger : Le fait que l'indicateur Delta calculé en fonction des données d'accès dépasse le seuil spécifié est utilisé pour déterminer s'il faut déclencher le calcul de la fenêtre.

  • PurgingTrigger : N'importe quel déclencheur peut être converti en déclencheur de type Purge en tant que paramètre, et les données seront nettoyées une fois le calcul terminé.

  • NeverTrigger : Ne jamais déclencher le calcul de fenêtre à aucun moment

Comment utiliser le déclencheur de fenêtre Java Flink Trigger

# 🎜🎜# Regardez principalement le code source de EventTimeTrigger et ProcessingTimeTrigger.

EventTimeTrigger code source

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "EventTimeTrigger()";
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}

ProcessingTimeTrigger code source

public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "ProcessingTimeTrigger()";
    }

    public static ProcessingTimeTrigger create() {
        return new ProcessingTimeTrigger();
    }
}

Dans la méthode onElement(), ctx.registerProcessingTimeTimer(window.maxTimestamp()) sera être enregistré Un minuteur ProcessingTime, le paramètre time est window.maxTimestamp(), qui est l'heure finale de la fenêtre. Lorsque l'heure atteint l'heure finale de la fenêtre, le minuteur se déclenche et appelle la méthode onProcessingTime() dans onProcessingTime. (), renvoie TriggerResult.FIRE Autrement dit, renvoie FIRE, déclenchant le calcul des données dans la fenêtre, mais en conservant les éléments de la fenêtre.

Il est à noter que la classe ProcessingTimeTrigger ne déclenchera le calcul de la fonction fenêtre que lorsque l'heure finale de la fenêtre arrivera. Une fois le calcul terminé, les données de la fenêtre ne seront pas effacées. Ces données sont stockées en mémoire sauf si vous appelez PURGE ou FIRE_AND_PURGE, sinon les données resteront en mémoire. En fait, aucune des classes Trigger fournies dans Flink, à l'exception de la classe PurgingTrigger, n'effacera les données de la fenêtre.

Déclencheur de fenêtre commun

Fenêtre roulante

TumblingEventTimeWindows :EventTimeTrigger
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }
}

TumblingProcessingTimeWindows: ProcessingTimeTrigger

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }
}

Fenêtre coulissante#🎜 🎜#
SlidingEventTimeWindows:EventTimeTrigger
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
}
🎜🎜 #
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return ProcessingTimeTrigger.create();
        }
}

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer