Rumah  >  Artikel  >  hujung hadapan web  >  Mengulang Asynchronously Over Event Emitters dalam TypeScript dengan Async Generators

Mengulang Asynchronously Over Event Emitters dalam TypeScript dengan Async Generators

Barbara Streisand
Barbara Streisandasal
2024-09-30 06:26:39970semak imbas

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

pengenalan

Dalam pembangunan web moden, kami sering berurusan dengan acara, sama ada mengendalikan mesej WebSocket masuk, acara dihantar pelayan (SSE) atau strim data daripada perkhidmatan seperti Redis Pub/Sub. Walaupun Node.js menyediakan keupayaan dipacu peristiwa, ia tidak mempunyai cara yang luar biasa untuk tidak segerak melelakan acara menggunakan untuk menunggu...gelung.

Dalam siaran ini, saya akan membimbing anda melalui cara yang mudah tetapi berkesan untuk mencipta lelaran acara tak segerak menggunakan TypeScript dan AsyncGenerator. Pendekatan ini direka bentuk untuk membolehkan anda menggunakan peristiwa daripada sebarang jenis pemancar peristiwa dengan cara yang bersih dan boleh diramal, dengan kawalan penuh ke atas logik pembatalan dan pembersihan.

Kes Penggunaan: Redis Pub/Sub

Dalam salah satu projek saya baru-baru ini, saya perlu mendengar saluran Redis Pub/Sub dan menghantar acara dihantar pelayan (SSE) secara tidak segerak kepada pelanggan yang disambungkan. Cabarannya ialah mengendalikan acara masuk tanpa membebankan sistem sambil membenarkan pengguna membatalkan strim acara pada bila-bila masa.

Penyelesaian? Penyalur peristiwa yang menukarkan mana-mana pemancar peristiwa (seperti Redis Pub/Sub) kepada lelaran tak segerak. Ini membolehkan kami memproses acara dengan cara terkawal dan mengendalikan pembatalan dengan anggun apabila perlu.

Mari kita mendalami pelaksanaannya.

Kod

export type Context<T> = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise<void>;

export type Subscriber<T> = (
    context: Context<T>,
) => void | CleanupFn | Promise<CleanupFn | void>;

export async function* createEventIterator<T>(
    subscriber: Subscriber<T>,
): AsyncGenerator<T> {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise<void>((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}

Bagaimana Ia Berfungsi

Fungsi ini menerima fungsi pelanggan yang boleh anda sambungkan ke mana-mana pemancar acara atau sistem pub/sub. Pelanggan menyediakan dua kaedah penting:

  1. pancarkan: Membenarkan pelanggan untuk menolak acara baharu ke dalam lelaran.
  2. batal: Menyediakan cara untuk memberi isyarat bahawa lelaran harus berhenti.

Fungsi ini mengembalikan AsyncGenerator, membolehkan anda mengulangi peristiwa menggunakan gelung for awaiit...of.

Memecahkan Kod

  1. Objek Konteks:
    Konteks type menyediakan antara muka untuk memancarkan acara baharu atau membatalkan langganan. Pelanggan menggunakan konteks ini untuk mengawal aliran acara.

  2. Barisan Acara:
    Peristiwa: Tatasusunan T[] berfungsi sebagai penimbal untuk menyimpan peristiwa yang dipancarkan. Penjana akan memproses peristiwa ini satu demi satu. Jika tiada acara dalam baris gilir, ia akan menunggu untuk acara seterusnya dipancarkan.

  3. Pancarkan Logik:
    Fungsi emit menambah acara baharu pada baris gilir dan menyelesaikan sebarang janji yang belum selesai (iaitu, jika penjana sedang menunggu acara baharu).

  4. Pembatalan:
    Jika fungsi batal dipanggil, ia menetapkan bendera (dibatalkan = benar) untuk memberi isyarat bahawa gelung harus keluar. Sebarang peristiwa yang tinggal dalam baris gilir masih akan diproses sebelum penjana selesai.

  5. Pembersihan:
    Selepas pembatalan, penjana akan menggunakan fungsi berhenti melanggan (jika disediakan) untuk melakukan sebarang pembersihan yang diperlukan. Ini amat penting untuk menyahlanggan sistem luaran seperti Redis atau membersihkan sumber.

Contoh: Mendengar Redis Pub/Sub

Mari kita lihat bagaimana kita boleh menggunakan lelaran acara ini untuk mendengar Redis Pub/Sub dan melelakan secara tak segerak ke atas mesej masuk.

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator<string>(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();

Dalam contoh ini, kami menggunakan createEventIterator untuk melanggan saluran Redis Pub/Sub dan melelang secara tak segerak ke atas mesej. Setiap kali mesej baharu tiba, ia dipancarkan ke dalam penjana, di mana kami boleh memprosesnya dalam masa nyata. Jika mesej tertentu (cth., "STOP") diterima, kami memutuskan gelung dan berhenti melanggan Redis.

Contoh: Menggunakan EventEmitter

Begini cara anda boleh menggunakan createEventIterator dengan EventEmitter Node.js:

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator<string>(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();

Dalam contoh ini:

  • Kami menggunakan EventEmitter untuk memancarkan acara, yang ditangkap oleh createEventIterator.
  • Pelajar mendengar acara 'data' dan memprosesnya secara tidak segerak.
  • Sama seperti contoh Redis, kita boleh menghentikan lelaran apabila acara tertentu ('STOP') diterima.

Faedah Pendekatan Ini

  • Kawalan Asynchronous: Dengan memanfaatkan AsyncGenerator, kami boleh mengendalikan acara secara tidak segerak, memprosesnya mengikut kadar kami sendiri dan menjeda pemprosesan apabila diperlukan.

  • Pembatalan: Keupayaan untuk membatalkan strim acara pada bila-bila masa menjadikan pendekatan ini fleksibel, terutamanya dalam senario dunia sebenar di mana sambungan mungkin perlu ditutup dengan baik.

  • Tujuan Umum: Lelaran ini boleh digunakan untuk sebarang pemancar acara atau sistem Pub/Sub, menjadikannya serba boleh untuk aplikasi yang berbeza.

Kesimpulan

Seni bina dipacu acara ialah asas kepada banyak aplikasi web moden, tetapi ia boleh menjadi sukar untuk diurus apabila kita perlu mengawal aliran acara secara tidak segerak. Dengan kuasa AsyncGenerator dalam TypeScript, anda boleh membina penyelesaian yang elegan seperti lelaran acara ini, menjadikan kod pengendalian acara anda lebih bersih dan lebih mudah diselenggara.

Saya harap siaran ini membantu anda bermula dengan iterator async untuk pemancar acara anda sendiri. Jika anda mempunyai sebarang soalan atau pendapat, sila kongsikan dalam komen!

Atas ialah kandungan terperinci Mengulang Asynchronously Over Event Emitters dalam TypeScript dengan Async Generators. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn