ホームページ >ウェブフロントエンド >jsチュートリアル >ストレージからストリームへ: MongoDB データをユーザーに直接配信

ストレージからストリームへ: MongoDB データをユーザーに直接配信

DDD
DDDオリジナル
2025-01-04 04:34:38360ブラウズ

From Storage to Stream: Delivering MongoDB Data Directly to Users

ステップ 1: MongoDB カーソル

カーソルの設定方法は次のとおりです (スニペットを再利用)。

const cursor =
    userObject?.data?.serviceProviderName === 'ZYRO'
        ? zyroTransactionModel.find(query).cursor()
        : finoTransactionModel.find(query).cursor();

console.log("Cursor created successfully");

ステップ 2: ZIP ファイルのセットアップ
yazl ライブラリを使用して、CSV データを ZIP ファイルにストリーミングします:

const yazl = require('yazl');
const zipfile = new yazl.ZipFile();

reply.raw.writeHead(200, {
    "Content-Type": "application/zip",
    "Content-Disposition": "attachment; filename=transactions.zip",
});

zipfile.outputStream.pipe(reply.raw);

const cleanup = async () => {
    console.log("Cleaning up resources...");
    zipfile.end(); // Finalize ZIP
    await cursor.close();
};
reply.raw.on("close", cleanup);
reply.raw.on("error", cleanup);

ステップ 3: 動的 CSV ストリームの作成
CSV データを動的に生成し、ZIP ファイルにストリーミングします:

const createNewCSVStream = (headers) => {
    const csvStream = new Readable({ read() {} });
    csvStream.push(headers.join(",") + "\n"); // Add headers
    return csvStream;
};

const filteredHeaders = getHeaders(transactionDownloadFields, userObject?.state?.auth?.role);
const currentCSVStream = createNewCSVStream(filteredHeaders);

zipfile.addReadStream(currentCSVStream, "transactions_part_1.csv");

ステップ 4: MongoDB データを CSV にストリーミングする
データを MongoDB から CSV に直接ストリーミングします:

cursor.on('data', (doc) => {
    const csvRow = filteredHeaders.map(header => doc[header.key] || '').join(',');
    currentCSVStream.push(csvRow + '\n'); // Write row
});

cursor.on('end', () => {
    currentCSVStream.push(null); // End the stream
    zipfile.end(); // Finalize the ZIP
});

ステップ 5: MongoDB カーソルからのデータの処理
MongoDB カーソルからドキュメントをストリーミングし、必要に応じて変換し、動的に CSV ストリームに行を書き込みます:

try {
    for await (const doc of cursor) {
        if (clientDisconnected) {
            console.log("Client disconnected. Stopping processing...");
            break;
        }

        streamedCount++;
        rowCount++;

        let row = "";
        const filteredHeaders = getHeaders(
            transactionDownloadFields,
            userObject?.state?.auth?.role
        );

        for (let i = 0; i < filteredHeaders.length; i++) {
            const field = filteredHeaders[i];

            // Fetch the corresponding field configuration from transactionDownloadFields
            const originalField = transactionDownloadFields.find((f) => f.value === field.value);

            // Get the value from the transaction document
            let value = getValueFromTransaction(doc, field.value);

            // Apply transformation if the field has a transform function
            if (originalField?.transform) {
                value = originalField.transform(value);
            }

            // Enclose the value in double quotes
            value = value !== undefined ? `"${value}"` : '"N/A"';
            row += (i > 0 ? "," : "") + value;
        }
        row += "\n";
        currentCSVStream.push(row);

        // Check if the row count has reached the threshold for the current CSV file
        if (rowCount >= MAX_ROWS_PER_FILE) {
            console.log(`Threshold reached for file ${fileIndex - 1}. Starting new file...`);
            currentCSVStream.push(null); // End the current CSV stream
            currentCSVStream = createNewCSVStream(); // Start a new stream
            rowCount = 0; // Reset the row count
        }
    }

    // Finalize the current CSV stream if it has data
    if (currentCSVStream) {
        currentCSVStream.push(null);
    }

    // Finalize the ZIP file
    zipfile.end();
    console.log(`Successfully streamed ${streamedCount} rows across ${fileIndex - 1} files.`);
} catch (error) {
    console.error("Error during processing:", error);
    if (!headersSent) reply.status(500).send({ error: "Failed to generate ZIP file" });
} finally {
    // Cleanup: Close the MongoDB cursor
    await cursor.close().catch((err) => console.error("Error closing cursor:", err));
}

概要

for await...of を使用したドキュメントの反復:

MongoDB カーソルからドキュメントを 1 つずつ効率的にストリーミングします。
すべてのデータをメモリにロードせずにリアルタイム処理を可能にします。

  • 動的 CSV 行の生成:

filteredHeaders を反復処理することで各行を動的に構築します。
transactionDownloadFields.
で定義されている場合、変換関数を使用して変換を適用します。 行のしきい値とファイルの分割:

しきい値 (MAX_ROWS_PER_FILE) に対して行数を監視します。
しきい値に達すると、現在の CSV ストリームを終了し、新しい CSV ストリームを開始します。

  • エラー処理:

処理中に問題が発生した場合は、ログを記録し、エラー応答を送信します。
Finally ブロックで MongoDB カーソルを閉じることで、適切なクリーンアップを確保します。

  • ストリームを終了中:

null をプッシュして現在の CSV ストリームを終了します。
すべての行が処理されると ZIP ファイルが完成します。

以上がストレージからストリームへ: MongoDB データをユーザーに直接配信の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。