Rumah > Artikel > hujung hadapan web > Mengulang Asynchronously Over Event Emitters dalam TypeScript dengan Async Generators
Dalam pembangunan web moden, kami sering berurusan dengan acara, sama ada mengendalikan mesej WebSocket masuk, acara dihantar pelayan (SSE) atau strim data daripada perkhidmatan seperti Redis Pub/Sub. Walaupun Node.js menyediakan keupayaan dipacu peristiwa, ia tidak mempunyai cara yang luar biasa untuk tidak segerak melelakan acara menggunakan untuk menunggu...gelung.
Dalam siaran ini, saya akan membimbing anda melalui cara yang mudah tetapi berkesan untuk mencipta lelaran acara tak segerak menggunakan TypeScript dan AsyncGenerator. Pendekatan ini direka bentuk untuk membolehkan anda menggunakan peristiwa daripada sebarang jenis pemancar peristiwa dengan cara yang bersih dan boleh diramal, dengan kawalan penuh ke atas logik pembatalan dan pembersihan.
Dalam salah satu projek saya baru-baru ini, saya perlu mendengar saluran Redis Pub/Sub dan menghantar acara dihantar pelayan (SSE) secara tidak segerak kepada pelanggan yang disambungkan. Cabarannya ialah mengendalikan acara masuk tanpa membebankan sistem sambil membenarkan pengguna membatalkan strim acara pada bila-bila masa.
Penyelesaian? Penyalur peristiwa yang menukarkan mana-mana pemancar peristiwa (seperti Redis Pub/Sub) kepada lelaran tak segerak. Ini membolehkan kami memproses acara dengan cara terkawal dan mengendalikan pembatalan dengan anggun apabila perlu.
Mari kita mendalami pelaksanaannya.
export type Context<T> = { emit: (value: T) => void; cancel: () => void; }; export type CleanupFn = () => void | Promise<void>; export type Subscriber<T> = ( context: Context<T>, ) => void | CleanupFn | Promise<CleanupFn | void>; export async function* createEventIterator<T>( subscriber: Subscriber<T>, ): AsyncGenerator<T> { const events: T[] = []; let cancelled = false; // Create a promise that resolves whenever a new event is added to the events array let resolveNext: (() => void) | null = null; const emit = (event: T) => { events.push(event); // If we are awaiting for a new event, resolve the promise if (resolveNext) { resolveNext(); resolveNext = null; } }; const cancel = () => { cancelled = true; }; const unsubscribe = await subscriber({ emit, cancel }); try { while (!cancelled) { // If there are events in the queue, yield the next event if (events.length > 0) { yield events.shift()!; } else { // Wait for the next event await new Promise<void>((resolve) => { resolveNext = resolve; }); } } // Process any remaining events that were emitted before cancellation. while (events.length > 0) { yield events.shift()!; } } finally { await unsubscribe?.(); } }
Fungsi ini menerima fungsi pelanggan yang boleh anda sambungkan ke mana-mana pemancar acara atau sistem pub/sub. Pelanggan menyediakan dua kaedah penting:
Fungsi ini mengembalikan AsyncGenerator
Objek Konteks:
Konteks
Barisan Acara:
Peristiwa: Tatasusunan T[] berfungsi sebagai penimbal untuk menyimpan peristiwa yang dipancarkan. Penjana akan memproses peristiwa ini satu demi satu. Jika tiada acara dalam baris gilir, ia akan menunggu untuk acara seterusnya dipancarkan.
Pancarkan Logik:
Fungsi emit menambah acara baharu pada baris gilir dan menyelesaikan sebarang janji yang belum selesai (iaitu, jika penjana sedang menunggu acara baharu).
Pembatalan:
Jika fungsi batal dipanggil, ia menetapkan bendera (dibatalkan = benar) untuk memberi isyarat bahawa gelung harus keluar. Sebarang peristiwa yang tinggal dalam baris gilir masih akan diproses sebelum penjana selesai.
Pembersihan:
Selepas pembatalan, penjana akan menggunakan fungsi berhenti melanggan (jika disediakan) untuk melakukan sebarang pembersihan yang diperlukan. Ini amat penting untuk menyahlanggan sistem luaran seperti Redis atau membersihkan sumber.
Mari kita lihat bagaimana kita boleh menggunakan lelaran acara ini untuk mendengar Redis Pub/Sub dan melelakan secara tak segerak ke atas mesej masuk.
import Redis from 'ioredis'; function redisEventIterator(channel: string) { const client = new Redis(); return createEventIterator<string>(({ emit, cancel }) => { const messageHandler = (channel: string, message: string) => { emit(message); }; // Subscribe to the channel client.subscribe(channel); client.on('message', messageHandler); // Cleanup function to unsubscribe and disconnect return async () => { client.off('message', messageHandler); await client.unsubscribe(channel); await client.quit(); }; }); } // Usage (async () => { for await (const message of redisEventIterator('my-channel')) { console.log('New message:', message); // You can cancel the event stream if needed if (message === 'STOP') { break; } } })();
Dalam contoh ini, kami menggunakan createEventIterator untuk melanggan saluran Redis Pub/Sub dan melelang secara tak segerak ke atas mesej. Setiap kali mesej baharu tiba, ia dipancarkan ke dalam penjana, di mana kami boleh memprosesnya dalam masa nyata. Jika mesej tertentu (cth., "STOP") diterima, kami memutuskan gelung dan berhenti melanggan Redis.
Begini cara anda boleh menggunakan createEventIterator dengan EventEmitter Node.js:
import { EventEmitter } from 'events'; function eventEmitterIterator(emitter: EventEmitter, eventName: string) { return createEventIterator<string>(({ emit, cancel }) => { const eventHandler = (data: string) => emit(data); emitter.on(eventName, eventHandler); // Cleanup function to remove the listener return () => { emitter.off(eventName, eventHandler); }; }); } // Usage (async () => { const emitter = new EventEmitter(); // Simulate event emissions setTimeout(() => emitter.emit('data', 'First event'), 1000); setTimeout(() => emitter.emit('data', 'Second event'), 2000); setTimeout(() => emitter.emit('data', 'STOP'), 3000); for await (const event of eventEmitterIterator(emitter, 'data')) { console.log('Received event:', event); if (event === 'STOP') { break; } } })();
Dalam contoh ini:
Kawalan Asynchronous: Dengan memanfaatkan AsyncGenerator, kami boleh mengendalikan acara secara tidak segerak, memprosesnya mengikut kadar kami sendiri dan menjeda pemprosesan apabila diperlukan.
Pembatalan: Keupayaan untuk membatalkan strim acara pada bila-bila masa menjadikan pendekatan ini fleksibel, terutamanya dalam senario dunia sebenar di mana sambungan mungkin perlu ditutup dengan baik.
Tujuan Umum: Lelaran ini boleh digunakan untuk sebarang pemancar acara atau sistem Pub/Sub, menjadikannya serba boleh untuk aplikasi yang berbeza.
Seni bina dipacu acara ialah asas kepada banyak aplikasi web moden, tetapi ia boleh menjadi sukar untuk diurus apabila kita perlu mengawal aliran acara secara tidak segerak. Dengan kuasa AsyncGenerator dalam TypeScript, anda boleh membina penyelesaian yang elegan seperti lelaran acara ini, menjadikan kod pengendalian acara anda lebih bersih dan lebih mudah diselenggara.
Saya harap siaran ini membantu anda bermula dengan iterator async untuk pemancar acara anda sendiri. Jika anda mempunyai sebarang soalan atau pendapat, sila kongsikan dalam komen!
Atas ialah kandungan terperinci Mengulang Asynchronously Over Event Emitters dalam TypeScript dengan Async Generators. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!