検索
ホームページJava&#&チュートリアルSpring Boot で RocketMQ を使用してバッチ メッセージ消費を実装する方法

How to Implement Batch Message Consumption with RocketMQ in Spring Boot

1. 依存関係の追加

まず、必要な依存関係を pom.xml ファイルに追加します。

<!-- RocketMQ Spring Boot dependency for Spring Boot 3 -->
<dependency>
    <groupid>org.apache.rocketmq</groupid>
    <artifactid>rocketmq-spring-boot-starter</artifactid>
    <version>2.3.1</version>
    <exclusions>
        <exclusion>
            <groupid>org.apache.rocketmq</groupid>
            <artifactid>rocketmq-client</artifactid>
        </exclusion>
    </exclusions>
</dependency>

<!-- Dependency compatible with MQ cluster version 5.3.0 -->
<dependency>
    <groupid>org.apache.rocketmq</groupid>
    <artifactid>rocketmq-client</artifactid>
    <version>5.3.0</version>
</dependency>

2. 設定ファイル bootstrap.yaml

bootstrap.yaml ファイルで RocketMQ 設定を構成します。

rocketmq:
  name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # Replace with actual NameServer addresses
  consumer:
    group: consume-group-test
    access-key: access # Configure if ACL is used
    secret-key: secret
    consume-message-batch-max-size: 50  # Max messages per batch
    pull-batch-size: 100  # Max messages pulled from Broker
  topics:
    project: "group-topic-1"
  groups:
    project: "consume-group-1"  # Use different groups for different business processes

3. 構成クラス MqConfigProperties

構成クラス MqConfigProperties を作成します:

import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;

import java.io.Serializable;

/**
 * RocketMQ Configuration Class
 */
@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfigProperties implements Serializable {

    private static final long serialVersionUID = 1L;

    @Autowired
    private RocketMQProperties rocketMQProperties;

    private TopicProperties topics;
    private GroupProperties groups;

    /**
     * Topic Configuration Class
     */
    @Data
    public static class TopicProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }

    /**
     * Consumer Group Configuration Class
     */
    @Data
    public static class GroupProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }
}

4. 消費者規範の実施

コンシューマ クラス UserConsumer を作成します:

import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Resource;
import java.util.List;

/**
 * Batch Consumer Implementation
 */
@Component
@Slf4j
public class UserConsumer implements SmartLifecycle {

    @Resource
    private MqConfigProperties mqConfigProperties;

    @Resource
    private ApplicationContext applicationContext;

    private volatile boolean running;
    private DefaultMQPushConsumer consumer;

    @Override
    public void start() {
        if (isRunning()) {
            throw new IllegalStateException("Consumer is already running");
        }
        initConsumer();
        setRunning(true);
        log.info("UserConsumer started successfully.");
    }

    @Override
    public void stop() {
        if (isRunning() && consumer != null) {
            consumer.shutdown();
            setRunning(false);
            log.info("UserConsumer stopped.");
        }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    private void setRunning(boolean running) {
        this.running = running;
    }

    private void initConsumer() {
        String topic = mqConfigProperties.getTopics().getProject();
        String group = mqConfigProperties.getGroups().getProject();
        String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer();
        String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey();
        String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey();

        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey);
        consumer = rpcHook != null
                ? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely())
                : new DefaultMQPushConsumer(group);

        consumer.setNamesrvAddr(nameServer);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeMessageBatchMaxSize(100);  // Set the batch size for consumption
        consumer.subscribe(topic, "*");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<messageext> msgs, ConsumeConcurrentlyContext context) {
                log.info("Received {} messages", msgs.size());
                for (MessageExt message : msgs) {
                    String body = new String(message.getBody());
                    log.info("Processing message: {}", body);
                    User user = JSONObject.parseObject(body, User.class);
                    processUser(user);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        log.info("UserConsumer initialized with topic [{}] and group [{}].", topic, group);
    }

    private void processUser(User user) {
        log.info("Processing user with ID: {}", user.getId());
        // Handle user-related business logic
    }
}
</messageext>

5. プロデューサーのサンプルコード

バッチ メッセージを生成するには、次の UserProducer クラスを使用できます:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;

public class UserProducer {

    private DefaultMQProducer producer;

    public void sendBatchMessages(List<user> users, String topic) {
        List<message> messages = new ArrayList();
        for (User user : users) {
            messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes()));
        }
        try {
            producer.send(messages);
        } catch (Exception e) {
            log.error("Error sending batch messages", e);
        }
    }
}
</message></user>

6. 追加の最適化に関する提案

  • パフォーマンスの最適化: コンシューマ スレッド プールのサイズを調整できます。デフォルトでは、consumptionThreadMin=20 および ConsumerThreadMax=20 に設定されています。同時実行性の高いシナリオでは、スレッド プールのサイズを増やすとパフォーマンスが向上します。

  • エラー処理: 消費が失敗した場合は、無限再試行ループを避けるために RECONSUME_LATER に注意してください。ビジネス要件に基づいて最大再試行回数を設定します。

  • テナント分離: 間違ったグループからのデータの消費を避けるために、ビジネス モジュールごとに異なるグループを使用します。これは実稼働環境では特に重要です。

以上がSpring Boot で RocketMQ を使用してバッチ メッセージ消費を実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
高度なJavaプロジェクト管理、自動化の構築、依存関係の解像度にMavenまたはGradleを使用するにはどうすればよいですか?高度なJavaプロジェクト管理、自動化の構築、依存関係の解像度にMavenまたはGradleを使用するにはどうすればよいですか?Mar 17, 2025 pm 05:46 PM

この記事では、Javaプロジェクト管理、自動化の構築、依存関係の解像度にMavenとGradleを使用して、アプローチと最適化戦略を比較して説明します。

適切なバージョン化と依存関係管理を備えたカスタムJavaライブラリ(JARファイル)を作成および使用するにはどうすればよいですか?適切なバージョン化と依存関係管理を備えたカスタムJavaライブラリ(JARファイル)を作成および使用するにはどうすればよいですか?Mar 17, 2025 pm 05:45 PM

この記事では、MavenやGradleなどのツールを使用して、適切なバージョン化と依存関係管理を使用して、カスタムJavaライブラリ(JARファイル)の作成と使用について説明します。

カフェインやグアバキャッシュなどのライブラリを使用して、Javaアプリケーションにマルチレベルキャッシュを実装するにはどうすればよいですか?カフェインやグアバキャッシュなどのライブラリを使用して、Javaアプリケーションにマルチレベルキャッシュを実装するにはどうすればよいですか?Mar 17, 2025 pm 05:44 PM

この記事では、カフェインとグアバキャッシュを使用してJavaでマルチレベルキャッシュを実装してアプリケーションのパフォーマンスを向上させています。セットアップ、統合、パフォーマンスの利点をカバーし、構成と立ち退きポリシー管理Best Pra

キャッシュや怠zyなロードなどの高度な機能を備えたオブジェクトリレーショナルマッピングにJPA(Java Persistence API)を使用するにはどうすればよいですか?キャッシュや怠zyなロードなどの高度な機能を備えたオブジェクトリレーショナルマッピングにJPA(Java Persistence API)を使用するにはどうすればよいですか?Mar 17, 2025 pm 05:43 PM

この記事では、キャッシュや怠zyなロードなどの高度な機能を備えたオブジェクトリレーショナルマッピングにJPAを使用することについて説明します。潜在的な落とし穴を強調しながら、パフォーマンスを最適化するためのセットアップ、エンティティマッピング、およびベストプラクティスをカバーしています。[159文字]

Javaのクラスロードメカニズムは、さまざまなクラスローダーやその委任モデルを含むどのように機能しますか?Javaのクラスロードメカニズムは、さまざまなクラスローダーやその委任モデルを含むどのように機能しますか?Mar 17, 2025 pm 05:35 PM

Javaのクラスロードには、ブートストラップ、拡張機能、およびアプリケーションクラスローダーを備えた階層システムを使用して、クラスの読み込み、リンク、および初期化が含まれます。親の委任モデルは、コアクラスが最初にロードされ、カスタムクラスのLOAに影響を与えることを保証します

See all articles

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

Dreamweaver Mac版

Dreamweaver Mac版

ビジュアル Web 開発ツール

EditPlus 中国語クラック版

EditPlus 中国語クラック版

サイズが小さく、構文の強調表示、コード プロンプト機能はサポートされていません

AtomエディタMac版ダウンロード

AtomエディタMac版ダウンロード

最も人気のあるオープンソースエディター

VSCode Windows 64 ビットのダウンロード

VSCode Windows 64 ビットのダウンロード

Microsoft によって発売された無料で強力な IDE エディター

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)