Rumah  >  Artikel  >  Java  >  Cara menggunakan pencetus tetingkap Java Flink Pencetus

Cara menggunakan pencetus tetingkap Java Flink Pencetus

王林
王林ke hadapan
2023-05-03 13:10:101570semak imbas

Definisi

Pencetus menentukan apabila tetingkap (dibentuk oleh pengagih tetingkap) sedia untuk diproses oleh fungsi tetingkap. Setiap WindowAssigner disertakan dengan Trigger nilai lalai. Jika pencetus lalai tidak memenuhi keperluan anda, anda boleh menggunakan pencetus(…).

Kod sumber pencetus

public abstract class Trigger<T, W extends Window> implements Serializable {
	/**
	 只要有元素落⼊到当前窗⼝, 就会调⽤该⽅法
	 * @param element 收到的元素
	 * @param timestamp 元素抵达时间.
	 * @param window 元素所属的window窗口.
	 * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.
	 */
    public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception;
	
	 /**
	 * processing-time 定时器回调函数
	 *
	 * @param time 定时器触发的时间.
	 * @param window 定时器触发的窗口对象.
	 * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.
	 */
    public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

	/**
	 * event-time 定时器回调函数
	 *
	 * @param time 定时器触发的时间.
	 * @param window 定时器触发的窗口对象.
	 * @param ctx ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.
	 */
    public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

	 /**
	 * 当 多个窗口合并到⼀个窗⼝的时候,调用该方法法,例如系统SessionWindow
	 *
	 * @param window 合并后的新窗口对象
	 * @param ctx ⼀个上下⽂对象,通常用该对象注册 timer(ProcessingTime/EventTime)回调以及访问状态
	 */
    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }
	
	/**
	 * 当窗口被删除后执⾏所需的任何操作。例如:可以清除定时器或者删除状态数据
	 */
    public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception;
    }

Kod sumber TriggerResult

public enum TriggerResult {
	// 表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。
    CONTINUE(false, false),
    // 触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。
    FIRE_AND_PURGE(true, true),
    // 触发窗口计算,但是保留窗口元素
    FIRE(true, false),
    // 不触发窗口计算,丢弃窗口,并且删除窗口的元素。
    PURGE(false, true);

    private final boolean fire;
    private final boolean purge;

    private TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return this.fire;
    }

    public boolean isPurge() {
        return this.purge;
    }
}

Setelah pencetus menentukan bahawa tetingkap sedia untuk diproses, ia akan menyala, dan status pemulangan boleh menjadi FIRE atau FIRE_AND_PURGE. Antaranya, FIRE mencetuskan pengiraan tetingkap dan mengekalkan kandungan tetingkap, manakala FIRE_AND_PURGE mencetuskan pengiraan tetingkap dan memadam kandungan tetingkap. Secara lalai, pencetus pralaksana hanya FIRE tanpa mengosongkan keadaan tetingkap.

Pratetap Flink

  • EventTimeTrigger: Tentukan sama ada untuk mencetuskan pengiraan tetingkap dengan membandingkan EventTime dengan Endtime tetingkap Jika EventTime lebih besar daripada Window EndTime, ia akan mencetuskan , jika tidak, ia tidak akan mencetuskan tetingkap akan terus menunggu.

  • ProcessTimeTrigger: Tentukan sama ada untuk mencetuskan tetingkap dengan membandingkan ProcessTime dan tetingkap EndTme Jika ProcessTime lebih besar daripada EndTime, pengiraan akan dicetuskan, jika tidak, tetingkap akan terus menunggu.

  • ContinuousEventTimeTrigger: Mengira tetingkap pencetus secara berkala berdasarkan selang atau masa tamat Tetingkap adalah kurang daripada tetingkap pencetus Masa Akhir semasa.

  • ContinuousProcessingTimeTrigger: Dikira berdasarkan tetingkap pencetus berkala selang atau masa tamat Tetingkap adalah kurang daripada tetingkap pencetus ProcessTime semasa.

  • CountTrigger: Tentukan sama ada untuk mencetuskan pengiraan tetingkap berdasarkan sama ada jumlah data yang diakses melebihi ambang yang ditetapkan.

  • DeltaTrigger: Tentukan sama ada untuk mencetuskan pengiraan tetingkap berdasarkan sama ada penunjuk Delta yang dikira daripada data capaian melebihi Ambang yang ditentukan.

  • PurgingTrigger: Sebarang pencetus boleh ditukar menjadi pencetus jenis Purge sebagai parameter dan data akan dibersihkan selepas pengiraan selesai.

  • NeverTrigger: Jangan cetuskan pengiraan tetingkap pada bila-bila masa

Cara menggunakan pencetus tetingkap Java Flink Pencetus

Terutamanya lihat pada kod sumber EventTimeTrigger dan ProcessingTimeTrigger .

Kod sumber EventTimeTrigger

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "EventTimeTrigger()";
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}

Kod sumber ProcessingTimeTrigger

public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "ProcessingTimeTrigger()";
    }

    public static ProcessingTimeTrigger create() {
        return new ProcessingTimeTrigger();
    }
}

Dalam kaedah onElement(), ctx.registerProcessingTimeTimer(window.maxTimestamp()) akan mendaftarkan pemasa ProcessingTime Parameternya ialah window.maxTimestamp(), iaitu masa terakhir tetingkap Apabila masa mencapai masa akhir tetingkap, pemasa mencetus dan memanggil kaedah onProcessingTime() Dalam kaedah onProcessingTime(), kembalikan TriggerResult. FIRE mengembalikan FIRE, mencetuskan Pengiraan data dalam tetingkap, tetapi elemen tetingkap akan dikekalkan.

Perlu diingat bahawa kelas ProcessingTimeTrigger hanya akan mencetuskan pengiraan fungsi tetingkap apabila masa akhir tetingkap tiba Selepas pengiraan selesai, data dalam tetingkap tidak akan dikosongkan disimpan dalam ingatan melainkan PURGE dipanggil atau FIRE_AND_PURGE, jika tidak, data akan sentiasa berada dalam ingatan. Malah, tiada kelas Trigger yang disediakan dalam Flink, kecuali kelas PurgingTrigger, akan mengosongkan data dalam tetingkap.

Pencetus untuk tetingkap biasa

Tetingkap menatal

TumblingEventTimeWindows :EventTimeTrigger
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }
}

TumblingProcessingTimeWindows: ProcessingTimeTrigger

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }
}

Tetingkap gelongsor

rreeeWindowsProcessingTimewindows
SlidingEventTimeWindows:EventTimeTrigger
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
}

Tetingkap Sesi

public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return ProcessingTimeTrigger.create();
        }
}

ProcessingTimeSessionWindows: ProcessingTimeTrigger

EventTimeSessionWindows:EventTimeTrigger
public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
}

Global Window

public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }
}

Atas ialah kandungan terperinci Cara menggunakan pencetus tetingkap Java Flink Pencetus. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam