ホームページ  >  記事  >  ウェブフロントエンド  >  マイクロサービス アーキテクチャで Kafka と Node.js を実装する

マイクロサービス アーキテクチャで Kafka と Node.js を実装する

王林
王林オリジナル
2024-08-10 07:02:031177ブラウズ

Implement Kafka and Node.js in Microservice Architecture

イベント駆動型アプリケーション用のマイクロサービス アーキテクチャを設計する場合、Apache Kafka と Node.js を統合すると、リアルタイム データ処理機能が大幅に強化されます。 。この記事では、Kafka Node.js 統合を活用して、ストリーミング データを効率的に処理する堅牢でスケーラブルなマイクロサービスを構築する方法を検討します。

マイクロサービス アーキテクチャで Apache Kafka を使用する理由

マイクロサービス アーキテクチャでは、サービスは相互に効率的に通信する必要があります。 Apache Kafka は、マイクロサービス間のリアルタイムのデータ交換を可能にする分散イベント ストリーミング プラットフォームとして機能します。サービスを分離し、大量のデータを処理しながら独立して動作できるようにします。

イベント駆動型アプリケーションにおける Kafka の利点

  • スケーラビリティ: Kafka の分散アーキテクチャは水平スケーリングをサポートしており、イベント駆動型アプリケーションでのリアルタイム データ処理に最適です。
  • フォールト トレランス: Kafka は、障害が発生した場合でもデータが確実に配信されることを保証します。
  • 高スループット: Kafka は 1 秒あたり数百万のイベントを処理でき、要求の厳しいマイクロサービス アプリケーションに高スループットを提供します。

Kafka Node.js 統合のセットアップ

マイクロサービス環境で Apache Kafka と Node.js を統合するには、Kafka をメッセージ ブローカーとして設定し、Node.js サービスに接続する必要があります。ステップバイステップのガイドは次のとおりです:

Kafka と Node.js をインストールする

まず、Apache KafkaNode.js がシステムにインストールされていることを確認します。次の記事に従って Kafka と Node.js をインストールできます:

  • Node.js の概要
  • Apache Kafka を始める
  • Apache Kafka を Node.js と統合する方法

Kafka Node.js クライアント ライブラリをインストールする

Node.jsKafka に接続するには、Node.js 用の人気のある Kafka クライアントである kafkajs ライブラリを使用できます。

npm install kafkajs

Node.js で Kafka プロデューサーを作成する

マイクロサービス アーキテクチャでは、Kafka プロデューサーは Kafka トピックにメッセージを送信する責任があります。以下は、Node.js で Kafka プロデューサーを作成する方法の簡単な例です:

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-producer',
  brokers: ['localhost:9092']
});

const producer = kafka.producer();

const sendMessage = async () => {
  await producer.connect();
  await producer.send({
    topic: 'my-topic',
    messages: [
      { value: 'Hello Kafka' },
    ],
  });
  await producer.disconnect();
};

sendMessage().catch(console.error);

Node.js で Kafka コンシューマーを作成する

Kafka コンシューマーは、Kafka トピックからメッセージを読み取るために使用されます。コンシューマを作成する方法は次のとおりです:

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-consumer',
  brokers: ['localhost:9092']
});

const consumer = kafka.consumer({ groupId: 'my-group' });

const runConsumer = async () => {
  await consumer.connect();
  await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      });
    },
  });
};

runConsumer().catch(console.error);

ケーススタディ

マイクロサービス アーキテクチャにおける Kafka と Node.js の統合を説明するために、次のケース スタディを検討してください。

シナリオ

2 つのマイクロサービスがあります:

  1. 注文サービス: 顧客の注文を処理します。
  2. 製品サービス: 製品の在庫を管理します。

注文サービスで購入または取引が発生すると、製品サービスの在庫が更新されます。 Kafka は、メッセージ ブローカーとして機能することでこの通信を促進します。

実装

  1. 注文サービス: 注文イベントを製品更新トピックに公開します。
  2. インベントリ サービス: 製品更新トピックからのメッセージを消費し、それに応じてインベントリを更新します。

オーダーサービスプロデューサースクリプト

注文サービスは、注文書を処理し、在庫を更新するために製品サービスにメッセージを送信する責任があります。 Kafka プロデューサーとして Order Service を実装する方法は次のとおりです。

// orderService.js
const express = require('express');
const { Kafka } = require('kafkajs');

// Kafka producer configuration
const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['localhost:9092'],
});

const producer = kafka.producer();

// Initialize Express app
const app = express();
app.use(express.json());

const placeOrder = async (orderId, productId, quantity) => {
  await producer.connect();
  const orderEvent = {
    orderId,
    productId,
    quantity,
    eventType: 'ORDER_PLACED',
    timestamp: Date.now(),
  };
  await producer.send({
    topic: 'product-updates',
    messages: [{ value: JSON.stringify(orderEvent) }],
  });
  await producer.disconnect();
  console.log(`Order placed: ${orderId} for product: ${productId}`);
};

// API endpoint to place an order
app.post('/order', async (req, res) => {
  const { orderId, productId, quantity } = req.body;

  if (!orderId || !productId || !quantity) {
    return res.status(400).json({ error: 'Missing orderId, productId, or quantity' });
  }

  try {
    await placeOrder(orderId, productId, quantity);
    res.status(200).json({ message: `Order ${orderId} placed successfully.` });
  } catch (error) {
    console.error('Error placing order:', error);
    res.status(500).json({ error: 'Failed to place order' });
  }
});

// Start the server
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Order Service API running on port ${PORT}`);
});

製品サービスコンシューマスクリプト

Product Service は、product-updates Kafka トピックからのメッセージを消費し、それに応じて製品在庫を更新します。実装は次のとおりです:

// productService.js
const express = require('express');
const { Kafka } = require('kafkajs');

// Kafka consumer configuration
const kafka = new Kafka({
  clientId: 'product-service',
  brokers: ['localhost:9092'],
});

const consumer = kafka.consumer({ groupId: 'product-group' });

// Initialize Express app
const app = express();
app.use(express.json());

const updateStock = async () => {
  await consumer.connect();
  await consumer.subscribe({ topic: 'product-updates', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const orderEvent = JSON.parse(message.value.toString());
      console.log(`Received order: ${orderEvent.orderId}, Product: ${orderEvent.productId}, Quantity: ${orderEvent.quantity}`);

      // Simulate stock update
      console.log(`Updating stock for product: ${orderEvent.productId}`);
      // logic to update stock
    },
  });
};

// Start the Product Service to listen for messages
updateStock().catch(console.error);

// Start the server
const PORT = process.env.PORT || 3001;
app.listen(PORT, () => {
  console.log(`Product Service API running on port ${PORT}`);
});

受信メッセージをリッスンする必要があるため、最初に 製品サービス を開始します。

node productService.js

製品サービスは、ポート 3001 (または指定されている場合は別のポート) でのリッスンを開始します。

次のコマンドで 注文サービス を開始します:

node orderService.js

オーダー サービスは、ポート 3000 (または指定されている場合は別のポート) で利用可能になります。

POST リクエストを Order Service API:
に送信することで注文できます。

curl -X POST http://localhost:3000/order \
-H "Content-Type: application/json" \
-d '{
  "orderId": "order-789",
  "productId": "product-123",
  "quantity": 5
}'

When an order is placed, the Order Service will send a Kafka message, and the Product Service will consume that message to update the stock:

Received order: order-789, Product: product-123, Quantity: 5
Updating stock for product: product-123

Conclusion

Integrating Apache Kafka and Node.js in your microservices architecture allows you to build highly scalable and resilient event-driven applications.

By following best practices and leveraging Kafka’s powerful features, you can efficiently process real-time data and create a robust communication layer between your microservices.

以上がマイクロサービス アーキテクチャで Kafka と Node.js を実装するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。