搜索
首页Javajava教程SpringBoot怎么整合Pulsar

一、添加pom.xml依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.0</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client</artifactId>
        <version>2.10.0</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.24</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

二、Pulsar 参数类

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @Author: huangyibo
 * @Date: 2022/5/28 2:32
 * @Description: Pulsar 参数类
 */

@Component
@ConfigurationProperties(prefix = "tdmq.pulsar")
@Data
public class PulsarProperties {

    /**
     * 接入地址
     */
    private String serviceurl;

    /**
     * 命名空间tdc
     */
    private String tdcNamespace;

    /**
     * 角色tdc的token
     */
    private String tdcToken;

    /**
     * 集群name
     */
    private String cluster;

    /**
     * topicMap
     */
    private Map<String, String> topicMap;

    /**
     * 订阅
     */
    private Map<String, String> subMap;

    /**
     * 开关 on:Consumer可用 ||||| off:Consumer断路
     */
    private String onOff;
}

三、Pulsar 配置类

import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: huangyibo
 * @Date: 2022/5/28 2:33
 * @Description: Pulsar 配置类
 */

@Configuration
@EnableConfigurationProperties(PulsarProperties.class)
public class PulsarConfig {

    @Autowired
    PulsarProperties pulsarProperties;

    @Bean
    public PulsarClient getPulsarClient() {

        try {
            return PulsarClient.builder()
                    .authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken()))
                    .serviceUrl(pulsarProperties.getServiceurl())
                    .build();
        } catch (PulsarClientException e) {
            System.out.println(e);
            throw new RuntimeException("初始化Pulsar Client失败");
        }
    }

}

四、不同消费数据类型的监听器

import com.yibo.pulsar.pojo.User;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.springframework.stereotype.Component;

/**
 * @Author: huangyibo
 * @Date: 2022/5/28 2:37
 * @Description:
 */

@Component
public class UserMessageListener implements MessageListener<User> {

    @Override
    public void received(Consumer<User> consumer, Message<User> msg) {
        try {
            User user = msg.getValue();
            System.out.println(user);
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    }
}
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.springframework.stereotype.Component;

/**
 * @Author: huangyibo
 * @Date: 2022/5/28 2:37
 * @Description:
 */

@Component
public class StringMessageListener implements MessageListener<String> {

    @Override
    public void received(Consumer<String> consumer, Message<String> msg) {
        try {
            System.out.println(msg.getValue());
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    }
}

五、Pulsar的核心服务类

import com.yibo.pulsar.common.listener.StringMessageListener;
import com.yibo.pulsar.common.listener.UserMessageListener;
import com.yibo.pulsar.pojo.User;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * @Author: huangyibo
 * @Date: 2022/5/28 2:35
 * @Description: Pulsar的核心服务类
 */

@Component
public class PulsarCommon {

    @Autowired
    private PulsarProperties pulsarProperties;

    @Autowired
    private PulsarClient client;

    @Autowired
    private UserMessageListener userMessageListener;

    @Autowired
    private StringMessageListener stringMessageListener;


    /**
     * 创建一个生产者 
     * @param topic     topic name
     * @param schema    schema方式
     * @param <T>       泛型
     * @return          Producer生产者
     */
    public <T> Producer<T> createProducer(String topic, Schema<T> schema) {

        try {
            return client.newProducer(schema)
                    .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)
                    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                    .sendTimeout(10, TimeUnit.SECONDS)
                    .blockIfQueueFull(true)
                    .create();
        } catch (PulsarClientException e) {
            throw new RuntimeException("初始化Pulsar Producer失败");
        }
    }


    /**
     * 
     * @param topic             topic name
     * @param subscription      sub name
     * @param messageListener   MessageListener的自定义实现类
     * @param schema            schema消费方式
     * @param <T>               泛型
     * @return                  Consumer消费者
     */
    public <T> Consumer<T> createConsumer(String topic, String subscription,
                                   MessageListener<T> messageListener, Schema<T> schema) {
        try {
            return client.newConsumer(schema)
                    .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)
                    .subscriptionName(subscription)
                    .ackTimeout(10, TimeUnit.SECONDS)
                    .subscriptionType(SubscriptionType.Shared)
                    .messageListener(messageListener)
                    .subscribe();
        } catch (PulsarClientException e) {
            throw new RuntimeException("初始化Pulsar Consumer失败");
        }
    }

    
    /**
     * 异步发送一条消息
     * @param message       消息体
     * @param producer      生产者实例
     * @param <T>           消息泛型
     */
    public <T> void sendAsyncMessage(T message, Producer<T> producer) {
        producer.sendAsync(message).thenAccept(msgId -> {
        });
    }
    
    
    /**
     * 同步发送一条消息
     * @param message       消息体
     * @param producer      生产者实例
     * @param <T>           泛型
     * @throws PulsarClientException
     */
    public <T> void sendSyncMessage(T message, Producer<T> producer) throws PulsarClientException {
        MessageId send = producer.send(message);
        System.out.println();
        System.out.println();
        System.out.println();
        System.out.println();
        System.out.println(send);
    }

    
    //-----------consumer-----------
    @Bean(name = "comment-publish-topic-consumer")
    public Consumer<String> getCommentPublishTopicConsumer() {
        return this.createConsumer(pulsarProperties.getTopicMap().get("comment-publish-topic"),
                pulsarProperties.getSubMap().get("comment-publish-topic-test"),
                stringMessageListener, Schema.STRING);
    }


    @Bean(name = "reply-publish-topic-consumer")
    public Consumer<User> getReplyPublishTopicConsumer() {
        return this.createConsumer(pulsarProperties.getTopicMap().get("reply-publish-topic"),
                pulsarProperties.getSubMap().get("reply-publish-topic-test"),
                userMessageListener, AvroSchema.of(User.class));
    }


    //-----------producer-----------
    @Bean(name = "comment-publish-topic-producer")
    public Producer<String> getCommentPublishTopicProducer() {
        return this.createProducer(pulsarProperties.getTopicMap().get("comment-publish-topic"),Schema.STRING);
    }


    @Bean(name = "reply-publish-topic-producer")
    public Producer<User> getReplyPublishTopicProducer() {
        return this.createProducer(pulsarProperties.getTopicMap().get("reply-publish-topic"), AvroSchema.of(User.class));
    }
}

六、Pulsar整合Spring Cloud

后来发现如上代码会导致BUG-> 在更新Nacos配置之后 Consumer会挂掉
经排查发现结果是由于@RefreshScope注解导致,此注解将摧毁Bean,PulsarConsumer和Producer都将被摧毁,只是说Producer将在下⼀次调⽤中完成重启,Consumer则不能重启,因为没有调⽤,那么怎么解决呢?

就是发布系列事件以刷新容器

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * @Author: huangyibo
 * @Date: 2022/5/28 2:34
 * @Description:
 */

@Component
@Slf4j
public class RefreshPulsarListener implements ApplicationListener {

    @Autowired
    ApplicationContext applicationContext;

    @Override
    public void onApplicationEvent(ApplicationEvent event) {

        if (event.getSource().equals("__refreshAll__")) {
            log.info("Nacos配置中心配置修改 重启Pulsar====================================");
            log.info("重启PulsarClient,{}", applicationContext.getBean("getPulsarClient"));
            log.info("重启PulsarConsumer,{}", applicationContext.getBean("comment-publish-topic-consumer"));
            log.info("重启PulsarConsumer,{}", applicationContext.getBean("reply-publish-topic-consumer"));
        }
    }

}

以上是SpringBoot怎么整合Pulsar的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文转载于:亿速云。如有侵权,请联系admin@php.cn删除
Java开发的哪些方面取决于平台?Java开发的哪些方面取决于平台?Apr 26, 2025 am 12:19 AM

JavadevelovermentIrelyPlatForm-DeTueTososeVeralFactors.1)JVMVariationsAffectPerformanceNandBehaviorAcroSsdifferentos.2)Nativelibrariesviajnijniiniininiinniinindrododerplatefform.3)

在不同平台上运行Java代码时是否存在性能差异?为什么?在不同平台上运行Java代码时是否存在性能差异?为什么?Apr 26, 2025 am 12:15 AM

Java代码在不同平台上运行时会有性能差异。1)JVM的实现和优化策略不同,如OracleJDK和OpenJDK。2)操作系统的特性,如内存管理和线程调度,也会影响性能。3)可以通过选择合适的JVM、调整JVM参数和代码优化来提升性能。

Java平台独立性有什么局限性?Java平台独立性有什么局限性?Apr 26, 2025 am 12:10 AM

Java'splatFormentenceHaslimitations不包括PerformanceOverhead,versionCompatibilityIsissues,挑战WithnativelibraryIntegration,Platform-SpecificFeatures,andjvminstallation/jvminstallation/jvmintenance/jeartenance.therefactorscomplicatorscomplicatethe“ writeOnce”

解释平台独立性和跨平台发展之间的差异。解释平台独立性和跨平台发展之间的差异。Apr 26, 2025 am 12:08 AM

PlatformIndependendecealLowsProgramStormonanyPlograwsStormanyPlatFormWithOutModification,而LileCross-PlatFormDevelopmentRequiredquiresMomePlatform-specificAdjustments.platFormIndependence,EneblesuniveByjava,EnablesuniversUniversAleversalexecutionbutmayCotutionButMayComproMisePerformance.cross.cross.cross-platformd

即时(JIT)汇编如何影响Java的性能和平台独立性?即时(JIT)汇编如何影响Java的性能和平台独立性?Apr 26, 2025 am 12:02 AM

JITcompilationinJavaenhancesperformancewhilemaintainingplatformindependence.1)Itdynamicallytranslatesbytecodeintonativemachinecodeatruntime,optimizingfrequentlyusedcode.2)TheJVMremainsplatform-independent,allowingthesameJavaapplicationtorunondifferen

为什么Java是开发跨平台桌面应用程序的流行选择?为什么Java是开发跨平台桌面应用程序的流行选择?Apr 25, 2025 am 12:23 AM

javaispopularforcross-platformdesktopapplicationsduetoits“ writeonce,runanywhere”哲学。1)itusesbytbytybytecebytecodethatrunsonanyjvm-platform.2)librarieslikeslikeslikeswingingandjavafxhelpcreatenative-lookingenative-lookinguisis.3)

讨论可能需要在Java中编写平台特定代码的情况。讨论可能需要在Java中编写平台特定代码的情况。Apr 25, 2025 am 12:22 AM

在Java中编写平台特定代码的原因包括访问特定操作系统功能、与特定硬件交互和优化性能。1)使用JNA或JNI访问Windows注册表;2)通过JNI与Linux特定硬件驱动程序交互;3)通过JNI使用Metal优化macOS上的游戏性能。尽管如此,编写平台特定代码会影响代码的可移植性、增加复杂性、可能带来性能开销和安全风险。

与平台独立性相关的Java开发的未来趋势是什么?与平台独立性相关的Java开发的未来趋势是什么?Apr 25, 2025 am 12:12 AM

Java将通过云原生应用、多平台部署和跨语言互操作进一步提升平台独立性。1)云原生应用将使用GraalVM和Quarkus提升启动速度。2)Java将扩展到嵌入式设备、移动设备和量子计算机。3)通过GraalVM,Java将与Python、JavaScript等语言无缝集成,增强跨语言互操作性。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

MinGW - 适用于 Windows 的极简 GNU

MinGW - 适用于 Windows 的极简 GNU

这个项目正在迁移到osdn.net/projects/mingw的过程中,你可以继续在那里关注我们。MinGW:GNU编译器集合(GCC)的本地Windows移植版本,可自由分发的导入库和用于构建本地Windows应用程序的头文件;包括对MSVC运行时的扩展,以支持C99功能。MinGW的所有软件都可以在64位Windows平台上运行。

PhpStorm Mac 版本

PhpStorm Mac 版本

最新(2018.2.1 )专业的PHP集成开发工具

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器