Maison >interface Web >js tutoriel >Itération asynchrone sur les émetteurs d'événements dans TypeScript avec des générateurs asynchrones

Itération asynchrone sur les émetteurs d'événements dans TypeScript avec des générateurs asynchrones

Barbara Streisand
Barbara Streisandoriginal
2024-09-30 06:26:39993parcourir

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

Introduction

Dans le développement Web moderne, nous traitons souvent d'événements, qu'il s'agisse de la gestion des messages WebSocket entrants, des événements envoyés par le serveur (SSE) ou des flux de données provenant de services comme Redis Pub/Sub. Bien que Node.js fournisse des fonctionnalités basées sur les événements, il lui manque un moyen prêt à l'emploi pour parcourir les événements de manière asynchrone à l'aide des boucles for wait...of.

Dans cet article, je vais vous expliquer un moyen simple mais puissant de créer un itérateur d'événement asynchrone à l'aide de TypeScript et AsyncGenerator. Cette approche est conçue pour vous permettre de consommer des événements provenant de tout type d'émetteur d'événements de manière propre et prévisible, avec un contrôle total sur la logique d'annulation et de nettoyage.

Le cas d'utilisation : Redis Pub/Sub

Dans l'un de mes projets récents, j'avais besoin d'écouter les canaux Redis Pub/Sub et de distribuer des événements envoyés par le serveur (SSE) de manière asynchrone aux clients connectés. Le défi consistait à gérer les événements entrants sans surcharger le système tout en permettant au consommateur d'annuler le flux d'événements à tout moment.

La solution ? Un itérateur d'événement qui convertit n'importe quel émetteur d'événement (tel que Redis Pub/Sub) en un itérable asynchrone. Cela nous permet de traiter les événements de manière contrôlée et de gérer avec élégance les annulations si nécessaire.

Plongeons dans la mise en œuvre.

Le code

export type Context<T> = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise<void>;

export type Subscriber<T> = (
    context: Context<T>,
) => void | CleanupFn | Promise<CleanupFn | void>;

export async function* createEventIterator<T>(
    subscriber: Subscriber<T>,
): AsyncGenerator<T> {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise<void>((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}

Comment ça marche

Cette fonction accepte une fonction d'abonné que vous pouvez connecter à n'importe quel émetteur d'événements ou système pub/sous. L'abonné propose deux méthodes essentielles :

  1. emit : permet à l'abonné de pousser de nouveaux événements dans l'itérateur.
  2. annuler : Fournit un moyen de signaler que l'itération doit s'arrêter.

La fonction renvoie un AsyncGenerator, vous permettant de parcourir les événements à l'aide d'une boucle for wait...of.

Décrypter le code

  1. Objet contextuel :
    Le contexte type fournit une interface pour émettre de nouveaux événements ou annuler l'abonnement. L'abonné utilise ce contexte pour contrôler le flux des événements.

  2. File d'attente des événements :
    Le tableau events: T[] sert de tampon pour stocker les événements émis. Le générateur traitera ces événements un par un. S'il n'y a aucun événement dans la file d'attente, il attendra que le prochain événement soit émis.

  3. Émettre la logique :
    La fonction d'émission ajoute de nouveaux événements à la file d'attente et résout toute promesse en attente (c'est-à-dire si le générateur attend de nouveaux événements).

  4. Annulation :
    Si la fonction d'annulation est appelée, elle définit un indicateur (annulé = vrai) pour signaler que la boucle doit se terminer. Tous les événements restants dans la file d'attente seront toujours traités avant la fin du générateur.

  5. Nettoyage :
    Après l'annulation, le générateur invoquera la fonction de désabonnement (si fournie) pour effectuer tout nettoyage nécessaire. Ceci est particulièrement important pour se désabonner de systèmes externes comme Redis ou pour nettoyer les ressources.

Exemple : écoute de Redis Pub/Sub

Voyons comment nous pouvons utiliser cet itérateur d'événement pour écouter Redis Pub/Sub et parcourir de manière asynchrone les messages entrants.

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator<string>(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();

Dans cet exemple, nous utilisons createEventIterator pour nous abonner à un canal Redis Pub/Sub et parcourir les messages de manière asynchrone. Chaque fois qu'un nouveau message arrive, il est émis dans le générateur, où nous pouvons le traiter en temps réel. Si un message spécifique (par exemple, "STOP") est reçu, nous rompons la boucle et nous désabonnons de Redis.

Exemple : utilisation d'EventEmitter

Voici comment utiliser createEventIterator avec EventEmitter de Node.js :

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator<string>(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();

Dans cet exemple :

  • Nous utilisons EventEmitter pour émettre des événements, qui sont capturés par createEventIterator.
  • L'itérateur écoute l'événement 'data' et le traite de manière asynchrone.
  • Semblable à l'exemple Redis, nous pouvons arrêter l'itération lorsqu'un événement spécifique (« STOP ») est reçu.

Avantages de cette approche

  • Contrôle asynchrone : en tirant parti d'AsyncGenerator, nous pouvons gérer les événements de manière asynchrone, les traiter à notre propre rythme et suspendre le traitement si nécessaire.

  • Annulation : la possibilité d'annuler le flux d'événements à tout moment rend cette approche flexible, en particulier dans les scénarios réels où les connexions peuvent devoir être fermées correctement.

  • Utilisation générale : cet itérateur peut être utilisé pour n'importe quel émetteur d'événements ou système Pub/Sub, ce qui le rend polyvalent pour différentes applications.

Conclusion

Les architectures basées sur les événements sont la pierre angulaire de nombreuses applications Web modernes, mais elles peuvent devenir difficiles à gérer lorsque nous devons contrôler le flux des événements de manière asynchrone. Grâce à la puissance d'AsyncGenerator dans TypeScript, vous pouvez créer des solutions élégantes comme cet itérateur d'événements, rendant votre code de gestion des événements plus propre et plus facile à maintenir.

J'espère que cet article vous aidera à démarrer avec des itérateurs asynchrones pour vos propres émetteurs d'événements. Si vous avez des questions ou des idées, n'hésitez pas à les partager dans les commentaires !

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn