您的 SvelteKit 應用程序是否在處理髮送電子郵件、調整圖像大小或處理數據等任務時遇到困難?使用 BullMQ,您可以將這些繁重的工作卸載到後台,並保持您的應用程序快如閃電。在這篇文章中,我們將向您展示如何設置它並像專業人士一樣處理現實世界的任務。讓我們開始吧!
tl;dr 在 hooks.server.js 中設置 BullMQ 工作線程。看看例子
BullMQ 是一個 Node.js 庫,用於使用 Redis 創建和管理作業隊列。它可以幫助您有效地在後台運行耗時的任務。憑藉重試、作業調度和並發控制等內置功能,BullMQ 使您在應用程序中處理複雜的工作流程變得簡單可靠。
首先,安裝 ioredis(node.js 的 Redis 客戶端)和 bullmq:
pnpm i -D ioredis bullmq
儘管您可以從像 Vercel 這樣的無服務器環境將作業添加到 bullmq 隊列,但工作線程必須在傳統的長壽命 Node.js 服務器上運行。因此,將adapter-auto替換為adapter-node:
pnpm rm @sveltejs/adapter-auto && pnpm i -D @sveltejs/adapter-node
不要忘記使用新安裝的節點適配器更新您的 Svelte 配置 (svelte.config.js)。
接下來,讓我們設置一個 BullMQ 作業隊列及其處理器。在 src/lib/server/ 目錄下創建 .js 文件:
// src/lib/server/background-jobs.js import { REDIS_URL } from "$env/static/private"; import { Queue, Worker } from "bullmq"; import IORedis from "ioredis"; const Q_NAME = "q"; export const jobsQueue = new Queue(Q_NAME, { connection: new IORedis(REDIS_URL), }); const sleep = (t) => new Promise((resolve) => setTimeout(resolve, t * 100)); export const setupBullMQProcessor = () => { new Worker( Q_NAME, async (job) => { for (let i = 0; i <= 100; i++) { await sleep(Math.random()); await job.updateProgress(i); await job.log(`Processing job at interval ${i}`); if (Math.random() * 200 < 1) throw new Error(`Random error at ${i}`); } return `This is the return value of job (${job.id})`; }, // https://docs.bullmq.io/bull/patterns/persistent-connections#maxretriesperrequest { connection: new IORedis(REDIS_URL, { maxRetriesPerRequest: null }) } ); };
在這裡,我們還創建了一個實用函數,用於實例化 BullMQ 工作線程來偵聽和處理隊列 Q_NAME 中的作業。
我們需要在 hooks.server.js 文件中調用此函數——無論是在頂層還是在 init 鉤子中。
// src/hooks.server.js // ... import { building } from "$app/environment"; import { setupBullMQProcessor } from "$lib/server/background-jobs"; // ... if (!building) { setupBullMQProcessor(); } // ...
!building 檢查會在構建期間跳過設置工作線程(以及 Redis 連接),從而加快進程。
? BullMQ 現在可以在我們的 SvelteKit 應用程序中使用了嗎?
為了測試設置,讓我們創建一個 POST 端點來將作業排入隊列。
// src/routes/+server.ts import { jobsQueue } from "$lib/server/background-jobs"; export const POST = async () => { const { id: jobId } = await jobsQueue.add("job", {}); /* The following code passes the job's progress to the client as a stream. If you don't need to update the client with the progress, you can skip the following. You can also use web-sockets or polling for that. */ const stream = new ReadableStream({ async pull(controller) { const job = await jobsQueue.getJob(jobId); controller.enqueue( JSON.stringify( job.failedReason ? { error: job.failedReason } : job.returnvalue ? { data: job.returnvalue } : { progress: job.progress } ) ); controller.enqueue("\n"); if (job.finishedOn) { controller.close(); } // wait for 1-second before sending the next status update await new Promise((r) => setTimeout(r, 1e3)); }, }); return new Response(stream, { headers: { "content-type": "text/plain" }, }); };
在前端,我們添加一個按鈕來觸發上述端點並隨後顯示作業的狀態:
<!-- src/routes/+page.svelte --> <script> let result = $state(); $inspect(result); const handleClick = async () => { const response = await fetch("/", { method: "post" }); const reader = await response.body.getReader(); while (true) { const { done, value } = await reader.read(); if (done) break; result = JSON.parse(new TextDecoder().decode(value)); } setTimeout(() => (result = undefined), 3e3); }; </script> {#if result?.error} <div> <p>Here is the output:</p> <p><img src="https://img.php.cn/upload/article/000/000/000/173785153552047.jpg" alt="background jobs demo using sveltekit and bullmq" loading="lazy" style="max-width:90%" style="max-width:90%" data-animated="true"></p> <hr> <h3> Bonus ? </h3> <p>You can also mount a bull-board dashboard in your SvelteKit app for easy monitoring of background jobs.</p> <p>Install bull-board dependencies<br> </p> <pre class="brush:php;toolbar:false">pnpm i -D @bull-board/api @bull-board/hono @hono/node-server hono
並修改你的hooks.server.js:
// src/hooks.server.js import { building } from "$app/environment"; import { jobsQueue, setupBullMQProcessor } from "$lib/server/background-jobs"; import { createBullBoard } from "@bull-board/api"; import { BullMQAdapter } from "@bull-board/api/bullMQAdapter"; import { HonoAdapter } from "@bull-board/hono"; import { serveStatic } from "@hono/node-server/serve-static"; import { Hono } from "hono"; if (!building) { setupBullMQProcessor(); } const bullboard = (() => { const serverAdapter = new HonoAdapter(serveStatic); createBullBoard({ queues: [new BullMQAdapter(jobsQueue)], serverAdapter, }); const app = new Hono({ strict: false }); const basePath = "/jobs"; serverAdapter.setBasePath(basePath); app.route(basePath, serverAdapter.registerPlugin()); return app; })(); export const handle = async ({ event, resolve }) => { if (event.url.pathname.match(/^\/jobs($|\/)/)) { return bullboard.fetch(event.request); } return resolve(event); };
然後訪問
以上是SvelteKit 中使用 BullMQ 的背景作業的詳細內容。更多資訊請關注PHP中文網其他相關文章!