第 1 步:MongoDB 游标
以下是我们设置光标的方法(重复使用您的代码片段):
const cursor = userObject?.data?.serviceProviderName === 'ZYRO' ? zyroTransactionModel.find(query).cursor() : finoTransactionModel.find(query).cursor(); console.log("Cursor created successfully");
第 2 步:设置 ZIP 文件
使用 yazl 库将 CSV 数据流式传输到 ZIP 文件中:
const yazl = require('yazl'); const zipfile = new yazl.ZipFile(); reply.raw.writeHead(200, { "Content-Type": "application/zip", "Content-Disposition": "attachment; filename=transactions.zip", }); zipfile.outputStream.pipe(reply.raw); const cleanup = async () => { console.log("Cleaning up resources..."); zipfile.end(); // Finalize ZIP await cursor.close(); }; reply.raw.on("close", cleanup); reply.raw.on("error", cleanup);
第 3 步:创建动态 CSV 流
动态生成 CSV 数据并将其传输到 ZIP 文件中:
const createNewCSVStream = (headers) => { const csvStream = new Readable({ read() {} }); csvStream.push(headers.join(",") + "\n"); // Add headers return csvStream; }; const filteredHeaders = getHeaders(transactionDownloadFields, userObject?.state?.auth?.role); const currentCSVStream = createNewCSVStream(filteredHeaders); zipfile.addReadStream(currentCSVStream, "transactions_part_1.csv");
第 4 步:将 MongoDB 数据流式传输到 CSV
将数据从 MongoDB 直接流式传输到 CSV:
cursor.on('data', (doc) => { const csvRow = filteredHeaders.map(header => doc[header.key] || '').join(','); currentCSVStream.push(csvRow + '\n'); // Write row }); cursor.on('end', () => { currentCSVStream.push(null); // End the stream zipfile.end(); // Finalize the ZIP });
第 5 步:处理来自 MongoDB 游标的数据
从 MongoDB 游标流式传输文档,根据需要转换它们,并将行动态写入 CSV 流:
try { for await (const doc of cursor) { if (clientDisconnected) { console.log("Client disconnected. Stopping processing..."); break; } streamedCount++; rowCount++; let row = ""; const filteredHeaders = getHeaders( transactionDownloadFields, userObject?.state?.auth?.role ); for (let i = 0; i < filteredHeaders.length; i++) { const field = filteredHeaders[i]; // Fetch the corresponding field configuration from transactionDownloadFields const originalField = transactionDownloadFields.find((f) => f.value === field.value); // Get the value from the transaction document let value = getValueFromTransaction(doc, field.value); // Apply transformation if the field has a transform function if (originalField?.transform) { value = originalField.transform(value); } // Enclose the value in double quotes value = value !== undefined ? `"${value}"` : '"N/A"'; row += (i > 0 ? "," : "") + value; } row += "\n"; currentCSVStream.push(row); // Check if the row count has reached the threshold for the current CSV file if (rowCount >= MAX_ROWS_PER_FILE) { console.log(`Threshold reached for file ${fileIndex - 1}. Starting new file...`); currentCSVStream.push(null); // End the current CSV stream currentCSVStream = createNewCSVStream(); // Start a new stream rowCount = 0; // Reset the row count } } // Finalize the current CSV stream if it has data if (currentCSVStream) { currentCSVStream.push(null); } // Finalize the ZIP file zipfile.end(); console.log(`Successfully streamed ${streamedCount} rows across ${fileIndex - 1} files.`); } catch (error) { console.error("Error during processing:", error); if (!headersSent) reply.status(500).send({ error: "Failed to generate ZIP file" }); } finally { // Cleanup: Close the MongoDB cursor await cursor.close().catch((err) => console.error("Error closing cursor:", err)); }
总结
文档迭代使用 for wait...of:
高效地从 MongoDB 游标中逐一流式记录文档。
无需将所有数据加载到内存即可实现实时处理。
通过迭代filteredHeaders动态构造每一行。
如果在 transactionDownloadFields 中定义,则使用转换函数应用转换。
行阈值和文件分割:
根据阈值 (MAX_ROWS_PER_FILE) 监控行数。
结束当前 CSV 流,并在达到阈值时启动新的 CSV 流。
如果处理过程中出现问题,则记录并发送错误响应。
通过关闭finally块中的MongoDB游标来确保正确的清理。
推入 null 以终止当前 CSV 流。
处理完所有行后,完成 ZIP 文件。
以上是从存储到流:将 MongoDB 数据直接交付给用户的详细内容。更多信息请关注PHP中文网其他相关文章!