首頁 >web前端 >js教程 >SvelteKit 中使用 BullMQ 的背景作業

SvelteKit 中使用 BullMQ 的背景作業

Linda Hamilton
Linda Hamilton原創
2025-01-26 08:32:11212瀏覽

您的 SvelteKit 應用程序是否在處理髮送電子郵件、調整圖像大小或處理數據等任務時遇到困難?使用 BullMQ,您可以將這些繁重的工作卸載到後台,並保持您的應用程序快如閃電。在這篇文章中,我們將向您展示如何設置它並像專業人士一樣處理現實世界的任務。讓我們開始吧!

tl;dr 在 hooks.server.js 中設置 BullMQ 工作線程。看看例子

什麼是 BullMQ?

BullMQ 是一個 Node.js 庫,用於使用 Redis 創建和管理作業隊列。它可以幫助您有效地在後台運行耗時的任務。憑藉重試、作業調度和並發控制等內置功能,BullMQ 使您在應用程序中處理複雜的工作流程變得簡單可靠。

第1步:安裝依賴項

首先,安裝 ioredis(node.js 的 Redis 客戶端)和 bullmq:

pnpm i -D ioredis bullmq

儘管您可以從像 Vercel 這樣的無服務器環境將作業添加到 bullmq 隊列,但工作線程必須在傳統的長壽命 Node.js 服務器上運行。因此,將adapter-auto替換為adapter-node:

pnpm rm @sveltejs/adapter-auto && pnpm i -D @sveltejs/adapter-node

不要忘記使用新安裝的節點適配器更新您的 Svelte 配置 (svelte.config.js)。

第 2 步:設置作業隊列和處理器

接下來,讓我們設置一個 BullMQ 作業隊列及其處理器。在 src/lib/server/ 目錄下創建 .js 文件:

// src/lib/server/background-jobs.js

import { REDIS_URL } from "$env/static/private";
import { Queue, Worker } from "bullmq";
import IORedis from "ioredis";

const Q_NAME = "q";

export const jobsQueue = new Queue(Q_NAME, {
  connection: new IORedis(REDIS_URL),
});

const sleep = (t) => new Promise((resolve) => setTimeout(resolve, t * 100));

export const setupBullMQProcessor = () => {
  new Worker(
    Q_NAME,
    async (job) => {
      for (let i = 0; i <= 100; i++) {
        await sleep(Math.random());
        await job.updateProgress(i);
        await job.log(`Processing job at interval ${i}`);

        if (Math.random() * 200 < 1) throw new Error(`Random error at ${i}`);
      }

      return `This is the return value of job (${job.id})`;
    },
    // https://docs.bullmq.io/bull/patterns/persistent-connections#maxretriesperrequest
    { connection: new IORedis(REDIS_URL, { maxRetriesPerRequest: null }) }
  );
};

在這裡,我們還創建了一個實用函數,用於實例化 BullMQ 工作線程來偵聽和處理隊列 Q_NAME 中的作業。

我們需要在 hooks.server.js 文件中調用此函數——無論是在頂層還是在 init 鉤子中。

// src/hooks.server.js

// ...
import { building } from "$app/environment";
import { setupBullMQProcessor } from "$lib/server/background-jobs";
// ...
if (!building) {
  setupBullMQProcessor();
}
// ...

!building 檢查會在構建期間跳過設置工作線程(以及 Redis 連接),從而加快進程。

? BullMQ 現在可以在我們的 SvelteKit 應用程序中使用了嗎?

演示時間

為了測試設置,讓我們創建一個 POST 端點來將作業排入隊列。

// src/routes/+server.ts

import { jobsQueue } from "$lib/server/background-jobs";

export const POST = async () => {
  const { id: jobId } = await jobsQueue.add("job", {});

  /*
  The following code passes the job's progress to the client as a stream.
  If you don't need to update the client with the progress, you can skip
  the following. You can also use web-sockets or polling for that.
  */
  const stream = new ReadableStream({
    async pull(controller) {
      const job = await jobsQueue.getJob(jobId);
      controller.enqueue(
        JSON.stringify(
          job.failedReason
            ? { error: job.failedReason }
            : job.returnvalue
            ? { data: job.returnvalue }
            : { progress: job.progress }
        )
      );
      controller.enqueue("\n");

      if (job.finishedOn) {
        controller.close();
      }

      // wait for 1-second before sending the next status update
      await new Promise((r) => setTimeout(r, 1e3));
    },
  });

  return new Response(stream, {
    headers: { "content-type": "text/plain" },
  });
};

在前端,我們添加一個按鈕來觸發上述端點並隨後顯示作業的狀態:

<!-- src/routes/+page.svelte -->

<script>
  let result = $state();

  $inspect(result);

  const handleClick = async () => {
    const response = await fetch("/", { method: "post" });
    const reader = await response.body.getReader();
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      result = JSON.parse(new TextDecoder().decode(value));
    }

    setTimeout(() => (result = undefined), 3e3);
  };
</script>

{#if result?.error}
  <div>



<p>Here is the output:</p>

<p><img src="https://img.php.cn/upload/article/000/000/000/173785153552047.jpg" alt="background jobs demo using sveltekit and bullmq" loading="lazy"    style="max-width:90%"  style="max-width:90%" data-animated="true"></p>


<hr>

<h3>
  
  
  Bonus ?
</h3>

<p>You can also mount a bull-board dashboard in your SvelteKit app for easy monitoring of background jobs.</p>

<p>Install bull-board dependencies<br>
</p>

<pre class="brush:php;toolbar:false">pnpm i -D @bull-board/api @bull-board/hono @hono/node-server hono

並修改你的hooks.server.js:

// src/hooks.server.js

import { building } from "$app/environment";
import { jobsQueue, setupBullMQProcessor } from "$lib/server/background-jobs";
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { HonoAdapter } from "@bull-board/hono";
import { serveStatic } from "@hono/node-server/serve-static";
import { Hono } from "hono";

if (!building) {
  setupBullMQProcessor();
}

const bullboard = (() => {
  const serverAdapter = new HonoAdapter(serveStatic);

  createBullBoard({
    queues: [new BullMQAdapter(jobsQueue)],
    serverAdapter,
  });
  const app = new Hono({ strict: false });
  const basePath = "/jobs";
  serverAdapter.setBasePath(basePath);
  app.route(basePath, serverAdapter.registerPlugin());

  return app;
})();

export const handle = async ({ event, resolve }) => {
  if (event.url.pathname.match(/^\/jobs($|\/)/)) {
    return bullboard.fetch(event.request);
  }

  return resolve(event);
};

然後訪問 /jobs 查看牛板儀表板

bull-board dashboard

以上是SvelteKit 中使用 BullMQ 的背景作業的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn