Heim >Web-Frontend >js-Tutorial >Asynchrones Iterieren über Ereignisemitter in TypeScript mit Async-Generatoren

Asynchrones Iterieren über Ereignisemitter in TypeScript mit Async-Generatoren

Barbara Streisand
Barbara StreisandOriginal
2024-09-30 06:26:39993Durchsuche

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

Einführung

In der modernen Webentwicklung beschäftigen wir uns häufig mit Ereignissen, sei es die Verarbeitung eingehender WebSocket-Nachrichten, vom Server gesendeter Ereignisse (SSE) oder Datenströme von Diensten wie Redis Pub/Sub. Obwohl Node.js ereignisgesteuerte Funktionen bereitstellt, fehlt eine sofort einsatzbereite Möglichkeit, mithilfe von for-await...of-Schleifen asynchron über Ereignisse zu iterieren.

In diesem Beitrag zeige ich Ihnen eine einfache, aber leistungsstarke Methode zum Erstellen eines asynchronen Ereignisiterators mit TypeScript und AsyncGenerator. Dieser Ansatz soll es Ihnen ermöglichen, Ereignisse von jeder Art von Ereignisemitter auf saubere und vorhersehbare Weise zu konsumieren, mit vollständiger Kontrolle über die Abbruch- und Bereinigungslogik.

Der Anwendungsfall: Redis Pub/Sub

In einem meiner letzten Projekte musste ich Redis Pub/Sub-Kanäle abhören und vom Server gesendete Ereignisse (SSE) asynchron an verbundene Clients versenden. Die Herausforderung bestand darin, eingehende Ereignisse zu verarbeiten, ohne das System zu überlasten, und gleichzeitig dem Verbraucher die Möglichkeit zu geben, den Ereignisstream jederzeit abzubrechen.

Die Lösung? Ein Ereignisiterator, der jeden Ereignisemitter (z. B. Redis Pub/Sub) in ein asynchrones Iterable umwandelt. Dies ermöglicht uns eine kontrollierte Verarbeitung von Ereignissen und eine reibungslose Stornierung bei Bedarf.

Lassen Sie uns in die Umsetzung eintauchen.

Der Kodex

export type Context<T> = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise<void>;

export type Subscriber<T> = (
    context: Context<T>,
) => void | CleanupFn | Promise<CleanupFn | void>;

export async function* createEventIterator<T>(
    subscriber: Subscriber<T>,
): AsyncGenerator<T> {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise<void>((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}

Wie es funktioniert

Diese Funktion akzeptiert eine Abonnentenfunktion, die Sie in jeden Event-Emitter oder Pub/Sub-System einbinden können. Der Abonnent stellt zwei wesentliche Methoden zur Verfügung:

  1. emit: Ermöglicht dem Abonnenten, neue Ereignisse in den Iterator zu übertragen.
  2. Abbrechen: Bietet eine Möglichkeit zu signalisieren, dass die Iteration beendet werden soll.

Die Funktion gibt einen AsyncGenerator zurück, der es Ihnen ermöglicht, mithilfe einer for-await...of-Schleife über Ereignisse zu iterieren.

Den Code aufschlüsseln

  1. Kontextobjekt:
    Der Kontext Typ bietet eine Schnittstelle zum Ausgeben neuer Ereignisse oder zum Kündigen des Abonnements. Der Abonnent nutzt diesen Kontext, um den Ablauf von Ereignissen zu steuern.

  2. Ereigniswarteschlange:
    Das Array events: T[] dient als Puffer zum Speichern ausgegebener Ereignisse. Der Generator verarbeitet diese Ereignisse nacheinander. Wenn sich keine Ereignisse in der Warteschlange befinden, wird auf die Ausgabe des nächsten Ereignisses gewartet.

  3. Logik ausgeben:
    Die Emit-Funktion fügt der Warteschlange neue Ereignisse hinzu und löst alle ausstehenden Versprechen auf (d. h. wenn der Generator auf neue Ereignisse wartet).

  4. Stornierung:
    Wenn die Abbruchfunktion aufgerufen wird, setzt sie ein Flag (abgebrochen = wahr), um zu signalisieren, dass die Schleife beendet werden soll. Alle verbleibenden Ereignisse in der Warteschlange werden noch verarbeitet, bevor der Generator abgeschlossen ist.

  5. Aufräumen:
    Nach dem Abbruch ruft der Generator die Abmeldefunktion auf (falls vorhanden), um alle erforderlichen Bereinigungen durchzuführen. Dies ist besonders wichtig, um sich von externen Systemen wie Redis abzumelden oder Ressourcen zu bereinigen.

Beispiel: Anhören von Redis Pub/Sub

Sehen wir uns an, wie wir diesen Ereignisiterator verwenden können, um Redis Pub/Sub abzuhören und die eingehenden Nachrichten asynchron zu iterieren.

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator<string>(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();

In diesem Beispiel verwenden wir createEventIterator, um einen Redis Pub/Sub-Kanal zu abonnieren und die Nachrichten asynchron zu durchlaufen. Jedes Mal, wenn eine neue Nachricht eintrifft, wird sie an den Generator gesendet, wo wir sie in Echtzeit verarbeiten können. Wenn eine bestimmte Nachricht (z. B. „STOP“) empfangen wird, unterbrechen wir die Schleife und melden uns von Redis ab.

Beispiel: Verwendung von EventEmitter

So können Sie createEventIterator mit dem EventEmitter von Node.js verwenden:

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator<string>(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();

In diesem Beispiel:

  • Wir verwenden EventEmitter, um Ereignisse auszusenden, die von createEventIterator erfasst werden.
  • Der Iterator wartet auf das „Daten“-Ereignis und verarbeitet es asynchron.
  • Ähnlich wie im Redis-Beispiel können wir die Iteration stoppen, wenn ein bestimmtes Ereignis („STOP“) empfangen wird.

Vorteile dieses Ansatzes

  • Asynchrone Steuerung: Durch die Nutzung des AsyncGenerator können wir Ereignisse asynchron verarbeiten, sie in unserem eigenen Tempo verarbeiten und die Verarbeitung bei Bedarf anhalten.

  • Stornierung: Die Möglichkeit, den Ereignisstream jederzeit abzubrechen, macht diesen Ansatz flexibel, insbesondere in realen Szenarien, in denen Verbindungen möglicherweise ordnungsgemäß geschlossen werden müssen.

  • Allzweck: Dieser Iterator kann für jeden Event-Emitter oder jedes Pub/Sub-System verwendet werden, wodurch er für verschiedene Anwendungen vielseitig einsetzbar ist.

Abschluss

Ereignisgesteuerte Architekturen sind ein Eckpfeiler vieler moderner Webanwendungen, aber ihre Verwaltung kann schwierig werden, wenn wir den Fluss von Ereignissen asynchron steuern müssen. Mit der Leistungsfähigkeit von AsyncGenerator in TypeScript können Sie elegante Lösungen wie diesen Event-Iterator erstellen, wodurch Ihr Event-Handling-Code sauberer und einfacher zu warten ist.

Ich hoffe, dieser Beitrag hilft Ihnen beim Einstieg in asynchrone Iteratoren für Ihre eigenen Event-Emitter. Wenn Sie Fragen oder Gedanken haben, teilen Sie diese gerne in den Kommentaren mit!

Das obige ist der detaillierte Inhalt vonAsynchrones Iterieren über Ereignisemitter in TypeScript mit Async-Generatoren. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn