您的 SvelteKit 应用程序是否在处理发送电子邮件、调整图像大小或处理数据等任务时遇到困难?使用 BullMQ,您可以将这些繁重的工作卸载到后台,并保持您的应用程序快如闪电。在这篇文章中,我们将向您展示如何设置它并像专业人士一样处理现实世界的任务。让我们开始吧!
tl;dr 在 hooks.server.js 中设置 BullMQ 工作线程。看看例子
BullMQ 是一个 Node.js 库,用于使用 Redis 创建和管理作业队列。它可以帮助您有效地在后台运行耗时的任务。凭借重试、作业调度和并发控制等内置功能,BullMQ 使您在应用程序中处理复杂的工作流程变得简单可靠。
首先,安装 ioredis(node.js 的 Redis 客户端)和 bullmq:
pnpm i -D ioredis bullmq
尽管您可以从像 Vercel 这样的无服务器环境将作业添加到 bullmq 队列,但工作线程必须在传统的长寿命 Node.js 服务器上运行。因此,将adapter-auto替换为adapter-node:
pnpm rm @sveltejs/adapter-auto && pnpm i -D @sveltejs/adapter-node
不要忘记使用新安装的节点适配器更新您的 Svelte 配置 (svelte.config.js)。
接下来,让我们设置一个 BullMQ 作业队列及其处理器。在 src/lib/server/ 目录下创建 .js 文件:
// src/lib/server/background-jobs.js import { REDIS_URL } from "$env/static/private"; import { Queue, Worker } from "bullmq"; import IORedis from "ioredis"; const Q_NAME = "q"; export const jobsQueue = new Queue(Q_NAME, { connection: new IORedis(REDIS_URL), }); const sleep = (t) => new Promise((resolve) => setTimeout(resolve, t * 100)); export const setupBullMQProcessor = () => { new Worker( Q_NAME, async (job) => { for (let i = 0; i <= 100; i++) { await sleep(Math.random()); await job.updateProgress(i); await job.log(`Processing job at interval ${i}`); if (Math.random() * 200 < 1) throw new Error(`Random error at ${i}`); } return `This is the return value of job (${job.id})`; }, // https://docs.bullmq.io/bull/patterns/persistent-connections#maxretriesperrequest { connection: new IORedis(REDIS_URL, { maxRetriesPerRequest: null }) } ); };
在这里,我们还创建了一个实用函数,用于实例化 BullMQ 工作线程来侦听和处理队列 Q_NAME 中的作业。
我们需要在 hooks.server.js 文件中调用此函数——无论是在顶层还是在 init 钩子中。
// src/hooks.server.js // ... import { building } from "$app/environment"; import { setupBullMQProcessor } from "$lib/server/background-jobs"; // ... if (!building) { setupBullMQProcessor(); } // ...
!building 检查会在构建期间跳过设置工作线程(以及 Redis 连接),从而加快进程。
? BullMQ 现在可以在我们的 SvelteKit 应用程序中使用了吗?
为了测试设置,让我们创建一个 POST 端点来将作业排入队列。
// src/routes/+server.ts import { jobsQueue } from "$lib/server/background-jobs"; export const POST = async () => { const { id: jobId } = await jobsQueue.add("job", {}); /* The following code passes the job's progress to the client as a stream. If you don't need to update the client with the progress, you can skip the following. You can also use web-sockets or polling for that. */ const stream = new ReadableStream({ async pull(controller) { const job = await jobsQueue.getJob(jobId); controller.enqueue( JSON.stringify( job.failedReason ? { error: job.failedReason } : job.returnvalue ? { data: job.returnvalue } : { progress: job.progress } ) ); controller.enqueue("\n"); if (job.finishedOn) { controller.close(); } // wait for 1-second before sending the next status update await new Promise((r) => setTimeout(r, 1e3)); }, }); return new Response(stream, { headers: { "content-type": "text/plain" }, }); };
在前端,我们添加一个按钮来触发上述端点并随后显示作业的状态:
<!-- src/routes/+page.svelte --> <script> let result = $state(); $inspect(result); const handleClick = async () => { const response = await fetch("/", { method: "post" }); const reader = await response.body.getReader(); while (true) { const { done, value } = await reader.read(); if (done) break; result = JSON.parse(new TextDecoder().decode(value)); } setTimeout(() => (result = undefined), 3e3); }; </script> {#if result?.error} <div> <p>Here is the output:</p> <p><img src="https://img.php.cn/upload/article/000/000/000/173785153552047.jpg" alt="background jobs demo using sveltekit and bullmq" loading="lazy" style="max-width:90%" style="max-width:90%" data-animated="true"></p> <hr> <h3> Bonus ? </h3> <p>You can also mount a bull-board dashboard in your SvelteKit app for easy monitoring of background jobs.</p> <p>Install bull-board dependencies<br> </p> <pre class="brush:php;toolbar:false">pnpm i -D @bull-board/api @bull-board/hono @hono/node-server hono
并修改你的hooks.server.js:
// src/hooks.server.js import { building } from "$app/environment"; import { jobsQueue, setupBullMQProcessor } from "$lib/server/background-jobs"; import { createBullBoard } from "@bull-board/api"; import { BullMQAdapter } from "@bull-board/api/bullMQAdapter"; import { HonoAdapter } from "@bull-board/hono"; import { serveStatic } from "@hono/node-server/serve-static"; import { Hono } from "hono"; if (!building) { setupBullMQProcessor(); } const bullboard = (() => { const serverAdapter = new HonoAdapter(serveStatic); createBullBoard({ queues: [new BullMQAdapter(jobsQueue)], serverAdapter, }); const app = new Hono({ strict: false }); const basePath = "/jobs"; serverAdapter.setBasePath(basePath); app.route(basePath, serverAdapter.registerPlugin()); return app; })(); export const handle = async ({ event, resolve }) => { if (event.url.pathname.match(/^\/jobs($|\/)/)) { return bullboard.fetch(event.request); } return resolve(event); };
然后访问
以上是Sveltekit的背景工作与BullMQ的详细内容。更多信息请关注PHP中文网其他相关文章!