Maison >interface Web >js tutoriel >Du stockage au flux : fournir des données MongoDB directement aux utilisateurs

Du stockage au flux : fournir des données MongoDB directement aux utilisateurs

DDD
DDDoriginal
2025-01-04 04:34:38325parcourir

From Storage to Stream: Delivering MongoDB Data Directly to Users

Étape 1 : Curseur MongoDB

Voici comment nous avons configuré le curseur (en réutilisant votre extrait) :

const cursor =
    userObject?.data?.serviceProviderName === 'ZYRO'
        ? zyroTransactionModel.find(query).cursor()
        : finoTransactionModel.find(query).cursor();

console.log("Cursor created successfully");

Étape 2 : Configuration du fichier ZIP
Utilisez la bibliothèque Yazl pour diffuser des données CSV dans un fichier ZIP :

const yazl = require('yazl');
const zipfile = new yazl.ZipFile();

reply.raw.writeHead(200, {
    "Content-Type": "application/zip",
    "Content-Disposition": "attachment; filename=transactions.zip",
});

zipfile.outputStream.pipe(reply.raw);

const cleanup = async () => {
    console.log("Cleaning up resources...");
    zipfile.end(); // Finalize ZIP
    await cursor.close();
};
reply.raw.on("close", cleanup);
reply.raw.on("error", cleanup);

Étape 3 : Création de flux CSV dynamiques
Générez dynamiquement des données CSV et diffusez-les dans le fichier ZIP :

const createNewCSVStream = (headers) => {
    const csvStream = new Readable({ read() {} });
    csvStream.push(headers.join(",") + "\n"); // Add headers
    return csvStream;
};

const filteredHeaders = getHeaders(transactionDownloadFields, userObject?.state?.auth?.role);
const currentCSVStream = createNewCSVStream(filteredHeaders);

zipfile.addReadStream(currentCSVStream, "transactions_part_1.csv");

Étape 4 : diffuser des données MongoDB vers CSV
Diffusez les données de MongoDB directement dans le CSV :

cursor.on('data', (doc) => {
    const csvRow = filteredHeaders.map(header => doc[header.key] || '').join(',');
    currentCSVStream.push(csvRow + '\n'); // Write row
});

cursor.on('end', () => {
    currentCSVStream.push(null); // End the stream
    zipfile.end(); // Finalize the ZIP
});

Étape 5 : Traitement des données du curseur MongoDB
Diffusez des documents à partir du curseur MongoDB, transformez-les si nécessaire et écrivez dynamiquement des lignes dans le flux CSV :

try {
    for await (const doc of cursor) {
        if (clientDisconnected) {
            console.log("Client disconnected. Stopping processing...");
            break;
        }

        streamedCount++;
        rowCount++;

        let row = "";
        const filteredHeaders = getHeaders(
            transactionDownloadFields,
            userObject?.state?.auth?.role
        );

        for (let i = 0; i < filteredHeaders.length; i++) {
            const field = filteredHeaders[i];

            // Fetch the corresponding field configuration from transactionDownloadFields
            const originalField = transactionDownloadFields.find((f) => f.value === field.value);

            // Get the value from the transaction document
            let value = getValueFromTransaction(doc, field.value);

            // Apply transformation if the field has a transform function
            if (originalField?.transform) {
                value = originalField.transform(value);
            }

            // Enclose the value in double quotes
            value = value !== undefined ? `"${value}"` : '"N/A"';
            row += (i > 0 ? "," : "") + value;
        }
        row += "\n";
        currentCSVStream.push(row);

        // Check if the row count has reached the threshold for the current CSV file
        if (rowCount >= MAX_ROWS_PER_FILE) {
            console.log(`Threshold reached for file ${fileIndex - 1}. Starting new file...`);
            currentCSVStream.push(null); // End the current CSV stream
            currentCSVStream = createNewCSVStream(); // Start a new stream
            rowCount = 0; // Reset the row count
        }
    }

    // Finalize the current CSV stream if it has data
    if (currentCSVStream) {
        currentCSVStream.push(null);
    }

    // Finalize the ZIP file
    zipfile.end();
    console.log(`Successfully streamed ${streamedCount} rows across ${fileIndex - 1} files.`);
} catch (error) {
    console.error("Error during processing:", error);
    if (!headersSent) reply.status(500).send({ error: "Failed to generate ZIP file" });
} finally {
    // Cleanup: Close the MongoDB cursor
    await cursor.close().catch((err) => console.error("Error closing cursor:", err));
}

Résumé

Itération du document utilisant pour wait...of :

Diffuse efficacement les documents un par un à partir du curseur MongoDB.
Permet le traitement en temps réel sans charger toutes les données en mémoire.

  • Génération dynamique de lignes CSV :

Construit chaque ligne de manière dynamique en itérant sur les en-têtes filtrés.
Applique les transformations à l'aide d'une fonction de transformation, si elle est définie dans transactionDownloadFields.
Seuil de ligne et fractionnement de fichiers :

Surveille le nombre de lignes par rapport au seuil (MAX_ROWS_PER_FILE).
Met fin au flux CSV actuel et en démarre un nouveau lorsque le seuil est atteint.

  • Gestion des erreurs :

Enregistre et envoie une réponse d'erreur si un problème survient pendant le traitement.
Assure un nettoyage approprié en fermant le curseur MongoDB dans le bloc final.

  • Finalisation des flux :

Pousse null pour mettre fin au flux CSV actuel.
Complète le fichier ZIP une fois que toutes les lignes sont traitées.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn