Vom Speicher zum Stream: MongoDB-Daten direkt an Benutzer liefern

From Storage to Stream: Delivering MongoDB Data Directly to Users

Schritt 1: MongoDB-Cursor

So richten wir den Cursor ein (unter Wiederverwendung Ihres Snippets):

const cursor =
    userObject?.data?.serviceProviderName === 'ZYRO'
        ? zyroTransactionModel.find(query).cursor()
        : finoTransactionModel.find(query).cursor();

console.log("Cursor created successfully");

Schritt 2: Einrichten der ZIP-Datei
Verwenden Sie die yazl-Bibliothek, um CSV-Daten in eine ZIP-Datei zu streamen:

const yazl = require('yazl');
const zipfile = new yazl.ZipFile();

reply.raw.writeHead(200, {
    "Content-Type": "application/zip",
    "Content-Disposition": "attachment; filename=transactions.zip",


const cleanup = async () => {
    console.log("Cleaning up resources...");
    zipfile.end(); // Finalize ZIP
    await cursor.close();
reply.raw.on("close", cleanup);
reply.raw.on("error", cleanup);

Schritt 3: Dynamische CSV-Streams erstellen
CSV-Daten dynamisch generieren und in die ZIP-Datei streamen:

const createNewCSVStream = (headers) => {
    const csvStream = new Readable({ read() {} });
    csvStream.push(headers.join(",") + "\n"); // Add headers
    return csvStream;

const filteredHeaders = getHeaders(transactionDownloadFields, userObject?.state?.auth?.role);
const currentCSVStream = createNewCSVStream(filteredHeaders);

zipfile.addReadStream(currentCSVStream, "transactions_part_1.csv");

Schritt 4: MongoDB-Daten in CSV streamen
Streamen Sie die Daten von MongoDB direkt in die CSV:

cursor.on('data', (doc) => {
    const csvRow = filteredHeaders.map(header => doc[header.key] || '').join(',');
    currentCSVStream.push(csvRow + '\n'); // Write row

cursor.on('end', () => {
    currentCSVStream.push(null); // End the stream
    zipfile.end(); // Finalize the ZIP

Schritt 5: Daten vom MongoDB-Cursor verarbeiten
Streamen Sie Dokumente vom MongoDB-Cursor, transformieren Sie sie nach Bedarf und schreiben Sie Zeilen dynamisch in den CSV-Stream:

try {
    for await (const doc of cursor) {
        if (clientDisconnected) {
            console.log("Client disconnected. Stopping processing...");


        let row = "";
        const filteredHeaders = getHeaders(

        for (let i = 0; i < filteredHeaders.length; i++) {
            const field = filteredHeaders[i];

            // Fetch the corresponding field configuration from transactionDownloadFields
            const originalField = transactionDownloadFields.find((f) => f.value === field.value);

            // Get the value from the transaction document
            let value = getValueFromTransaction(doc, field.value);

            // Apply transformation if the field has a transform function
            if (originalField?.transform) {
                value = originalField.transform(value);

            // Enclose the value in double quotes
            value = value !== undefined ? `"${value}"` : '"N/A"';
            row += (i > 0 ? "," : "") + value;
        row += "\n";

        // Check if the row count has reached the threshold for the current CSV file
        if (rowCount >= MAX_ROWS_PER_FILE) {
            console.log(`Threshold reached for file ${fileIndex - 1}. Starting new file...`);
            currentCSVStream.push(null); // End the current CSV stream
            currentCSVStream = createNewCSVStream(); // Start a new stream
            rowCount = 0; // Reset the row count

    // Finalize the current CSV stream if it has data
    if (currentCSVStream) {

    // Finalize the ZIP file
    console.log(`Successfully streamed ${streamedCount} rows across ${fileIndex - 1} files.`);
} catch (error) {
    console.error("Error during processing:", error);
    if (!headersSent) reply.status(500).send({ error: "Failed to generate ZIP file" });
} finally {
    // Cleanup: Close the MongoDB cursor
    await cursor.close().catch((err) => console.error("Error closing cursor:", err));


Dokumentiteration mit for waiting...of:

Streamt Dokumente einzeln effizient vom MongoDB-Cursor.
Ermöglicht Echtzeitverarbeitung, ohne alle Daten in den Speicher laden zu müssen.

  • Dynamische CSV-Zeilengenerierung:

Erstellt jede Zeile dynamisch durch Iteration über filteredHeaders.
Wendet Transformationen mithilfe einer Transformationsfunktion an, sofern in „transactionDownloadFields.
“ definiert Zeilenschwellenwert und Dateiaufteilung:

Überwacht die Zeilenanzahl anhand des Schwellenwerts (MAX_ROWS_PER_FILE).
Beendet den aktuellen CSV-Stream und startet einen neuen, wenn der Schwellenwert erreicht ist.

  • Fehlerbehandlung:

Protokolliert und sendet eine Fehlerantwort, wenn während der Verarbeitung ein Problem auftritt.
Stellt eine ordnungsgemäße Bereinigung sicher, indem der MongoDB-Cursor im „finally“-Block geschlossen wird.

  • Streams abschließen:

Schiebt Null, um den aktuellen CSV-Stream zu beenden.
Vervollständigt die ZIP-Datei, sobald alle Zeilen verarbeitet wurden.

