首页 >web前端 >js教程 >Sveltekit的背景工作与BullMQ

Sveltekit的背景工作与BullMQ

Linda Hamilton
Linda Hamilton原创
2025-01-26 08:32:11212浏览

您的 SvelteKit 应用程序是否在处理发送电子邮件、调整图像大小或处理数据等任务时遇到困难?使用 BullMQ,您可以将这些繁重的工作卸载到后台,并保持您的应用程序快如闪电。在这篇文章中,我们将向您展示如何设置它并像专业人士一样处理现实世界的任务。让我们开始吧!

tl;dr 在 hooks.server.js 中设置 BullMQ 工作线程。看看例子

什么是 BullMQ?

BullMQ 是一个 Node.js 库,用于使用 Redis 创建和管理作业队列。它可以帮助您有效地在后台运行耗时的任务。凭借重试、作业调度和并发控制等内置功能,BullMQ 使您在应用程序中处理复杂的工作流程变得简单可靠。

第1步:安装依赖项

首先,安装 ioredis(node.js 的 Redis 客户端)和 bullmq:

pnpm i -D ioredis bullmq

尽管您可以从像 Vercel 这样的无服务器环境将作业添加到 bullmq 队列,但工作线程必须在传统的长寿命 Node.js 服务器上运行。因此,将adapter-auto替换为adapter-node:

pnpm rm @sveltejs/adapter-auto && pnpm i -D @sveltejs/adapter-node

不要忘记使用新安装的节点适配器更新您的 Svelte 配置 (svelte.config.js)。

第 2 步:设置作业队列和处理器

接下来,让我们设置一个 BullMQ 作业队列及其处理器。在 src/lib/server/ 目录下创建 .js 文件:

// src/lib/server/background-jobs.js

import { REDIS_URL } from "$env/static/private";
import { Queue, Worker } from "bullmq";
import IORedis from "ioredis";

const Q_NAME = "q";

export const jobsQueue = new Queue(Q_NAME, {
  connection: new IORedis(REDIS_URL),
});

const sleep = (t) => new Promise((resolve) => setTimeout(resolve, t * 100));

export const setupBullMQProcessor = () => {
  new Worker(
    Q_NAME,
    async (job) => {
      for (let i = 0; i <= 100; i++) {
        await sleep(Math.random());
        await job.updateProgress(i);
        await job.log(`Processing job at interval ${i}`);

        if (Math.random() * 200 < 1) throw new Error(`Random error at ${i}`);
      }

      return `This is the return value of job (${job.id})`;
    },
    // https://docs.bullmq.io/bull/patterns/persistent-connections#maxretriesperrequest
    { connection: new IORedis(REDIS_URL, { maxRetriesPerRequest: null }) }
  );
};

在这里,我们还创建了一个实用函数,用于实例化 BullMQ 工作线程来侦听和处理队列 Q_NAME 中的作业。

我们需要在 hooks.server.js 文件中调用此函数——无论是在顶层还是在 init 钩子中。

// src/hooks.server.js

// ...
import { building } from "$app/environment";
import { setupBullMQProcessor } from "$lib/server/background-jobs";
// ...
if (!building) {
  setupBullMQProcessor();
}
// ...

!building 检查会在构建期间跳过设置工作线程(以及 Redis 连接),从而加快进程。

? BullMQ 现在可以在我们的 SvelteKit 应用程序中使用了吗?

演示时间

为了测试设置,让我们创建一个 POST 端点来将作业排入队列。

// src/routes/+server.ts

import { jobsQueue } from "$lib/server/background-jobs";

export const POST = async () => {
  const { id: jobId } = await jobsQueue.add("job", {});

  /*
  The following code passes the job's progress to the client as a stream.
  If you don't need to update the client with the progress, you can skip
  the following. You can also use web-sockets or polling for that.
  */
  const stream = new ReadableStream({
    async pull(controller) {
      const job = await jobsQueue.getJob(jobId);
      controller.enqueue(
        JSON.stringify(
          job.failedReason
            ? { error: job.failedReason }
            : job.returnvalue
            ? { data: job.returnvalue }
            : { progress: job.progress }
        )
      );
      controller.enqueue("\n");

      if (job.finishedOn) {
        controller.close();
      }

      // wait for 1-second before sending the next status update
      await new Promise((r) => setTimeout(r, 1e3));
    },
  });

  return new Response(stream, {
    headers: { "content-type": "text/plain" },
  });
};

在前端,我们添加一个按钮来触发上述端点并随后显示作业的状态:

<!-- src/routes/+page.svelte -->

<script>
  let result = $state();

  $inspect(result);

  const handleClick = async () => {
    const response = await fetch("/", { method: "post" });
    const reader = await response.body.getReader();
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      result = JSON.parse(new TextDecoder().decode(value));
    }

    setTimeout(() => (result = undefined), 3e3);
  };
</script>

{#if result?.error}
  <div>



<p>Here is the output:</p>

<p><img src="https://img.php.cn/upload/article/000/000/000/173785153552047.jpg" alt="background jobs demo using sveltekit and bullmq" loading="lazy"    style="max-width:90%"  style="max-width:90%" data-animated="true"></p>


<hr>

<h3>
  
  
  Bonus ?
</h3>

<p>You can also mount a bull-board dashboard in your SvelteKit app for easy monitoring of background jobs.</p>

<p>Install bull-board dependencies<br>
</p>

<pre class="brush:php;toolbar:false">pnpm i -D @bull-board/api @bull-board/hono @hono/node-server hono

并修改你的hooks.server.js:

// src/hooks.server.js

import { building } from "$app/environment";
import { jobsQueue, setupBullMQProcessor } from "$lib/server/background-jobs";
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { HonoAdapter } from "@bull-board/hono";
import { serveStatic } from "@hono/node-server/serve-static";
import { Hono } from "hono";

if (!building) {
  setupBullMQProcessor();
}

const bullboard = (() => {
  const serverAdapter = new HonoAdapter(serveStatic);

  createBullBoard({
    queues: [new BullMQAdapter(jobsQueue)],
    serverAdapter,
  });
  const app = new Hono({ strict: false });
  const basePath = "/jobs";
  serverAdapter.setBasePath(basePath);
  app.route(basePath, serverAdapter.registerPlugin());

  return app;
})();

export const handle = async ({ event, resolve }) => {
  if (event.url.pathname.match(/^\/jobs($|\/)/)) {
    return bullboard.fetch(event.request);
  }

  return resolve(event);
};

然后访问 /jobs 查看牛板仪表板

bull-board dashboard

以上是Sveltekit的背景工作与BullMQ的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn