>웹 프론트엔드 >JS 튜토리얼 >스토리지에서 스트림으로: MongoDB 데이터를 사용자에게 직접 전달

스토리지에서 스트림으로: MongoDB 데이터를 사용자에게 직접 전달

DDD
DDD원래의
2025-01-04 04:34:38360검색

From Storage to Stream: Delivering MongoDB Data Directly to Users

1단계: MongoDB 커서

커서를 설정하는 방법은 다음과 같습니다(스니펫 재사용).

const cursor =
    userObject?.data?.serviceProviderName === 'ZYRO'
        ? zyroTransactionModel.find(query).cursor()
        : finoTransactionModel.find(query).cursor();

console.log("Cursor created successfully");

2단계: ZIP 파일 설정
yazl 라이브러리를 사용하여 CSV 데이터를 ZIP 파일로 스트리밍합니다.

const yazl = require('yazl');
const zipfile = new yazl.ZipFile();

reply.raw.writeHead(200, {
    "Content-Type": "application/zip",
    "Content-Disposition": "attachment; filename=transactions.zip",
});

zipfile.outputStream.pipe(reply.raw);

const cleanup = async () => {
    console.log("Cleaning up resources...");
    zipfile.end(); // Finalize ZIP
    await cursor.close();
};
reply.raw.on("close", cleanup);
reply.raw.on("error", cleanup);

3단계: 동적 CSV 스트림 생성
CSV 데이터를 동적으로 생성하고 ZIP 파일로 스트리밍합니다.

const createNewCSVStream = (headers) => {
    const csvStream = new Readable({ read() {} });
    csvStream.push(headers.join(",") + "\n"); // Add headers
    return csvStream;
};

const filteredHeaders = getHeaders(transactionDownloadFields, userObject?.state?.auth?.role);
const currentCSVStream = createNewCSVStream(filteredHeaders);

zipfile.addReadStream(currentCSVStream, "transactions_part_1.csv");

4단계: MongoDB 데이터를 CSV로 스트리밍
MongoDB의 데이터를 CSV로 직접 스트리밍합니다.

cursor.on('data', (doc) => {
    const csvRow = filteredHeaders.map(header => doc[header.key] || '').join(',');
    currentCSVStream.push(csvRow + '\n'); // Write row
});

cursor.on('end', () => {
    currentCSVStream.push(null); // End the stream
    zipfile.end(); // Finalize the ZIP
});

5단계: MongoDB 커서에서 데이터 처리
MongoDB 커서에서 문서를 스트리밍하고, 필요에 따라 변환하고, CSV 스트림에 동적으로 행을 씁니다.

try {
    for await (const doc of cursor) {
        if (clientDisconnected) {
            console.log("Client disconnected. Stopping processing...");
            break;
        }

        streamedCount++;
        rowCount++;

        let row = "";
        const filteredHeaders = getHeaders(
            transactionDownloadFields,
            userObject?.state?.auth?.role
        );

        for (let i = 0; i < filteredHeaders.length; i++) {
            const field = filteredHeaders[i];

            // Fetch the corresponding field configuration from transactionDownloadFields
            const originalField = transactionDownloadFields.find((f) => f.value === field.value);

            // Get the value from the transaction document
            let value = getValueFromTransaction(doc, field.value);

            // Apply transformation if the field has a transform function
            if (originalField?.transform) {
                value = originalField.transform(value);
            }

            // Enclose the value in double quotes
            value = value !== undefined ? `"${value}"` : '"N/A"';
            row += (i > 0 ? "," : "") + value;
        }
        row += "\n";
        currentCSVStream.push(row);

        // Check if the row count has reached the threshold for the current CSV file
        if (rowCount >= MAX_ROWS_PER_FILE) {
            console.log(`Threshold reached for file ${fileIndex - 1}. Starting new file...`);
            currentCSVStream.push(null); // End the current CSV stream
            currentCSVStream = createNewCSVStream(); // Start a new stream
            rowCount = 0; // Reset the row count
        }
    }

    // Finalize the current CSV stream if it has data
    if (currentCSVStream) {
        currentCSVStream.push(null);
    }

    // Finalize the ZIP file
    zipfile.end();
    console.log(`Successfully streamed ${streamedCount} rows across ${fileIndex - 1} files.`);
} catch (error) {
    console.error("Error during processing:", error);
    if (!headersSent) reply.status(500).send({ error: "Failed to generate ZIP file" });
} finally {
    // Cleanup: Close the MongoDB cursor
    await cursor.close().catch((err) => console.error("Error closing cursor:", err));
}

요약

await...of를 사용하여 문서 반복:

MongoDB 커서에서 문서를 하나씩 효율적으로 스트리밍합니다.
모든 데이터를 메모리에 로드하지 않고 실시간 처리가 가능합니다.

  • 동적 CSV 행 생성:

filteredHeaders를 반복하여 각 행을 동적으로 구성합니다.
transactionDownloadFields에 정의된 경우 변환 함수를 사용하여 변환을 적용합니다.
행 임계값 및 파일 분할:

임계값(MAX_ROWS_PER_FILE)을 기준으로 행 수를 모니터링합니다.
현재 CSV 스트림을 종료하고 임계값에 도달하면 새 CSV 스트림을 시작합니다.

  • 오류 처리:

처리 중 문제가 발생하면 오류 응답을 기록하고 보냅니다.
finally 블록에서 MongoDB 커서를 닫아 적절한 정리를 보장합니다.

  • 스트림 마무리 중:

현재 CSV 스트림을 종료하려면 null을 푸시합니다.
모든 행이 처리되면 ZIP 파일을 완성합니다.

위 내용은 스토리지에서 스트림으로: MongoDB 데이터를 사용자에게 직접 전달의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.