<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.0</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.10.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.24</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build>
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import java.util.Map; /** * @Author: huangyibo * @Date: 2022/5/28 2:32 * @Description: Pulsar 参数类 */ @Component @ConfigurationProperties(prefix = "tdmq.pulsar") @Data public class PulsarProperties { /** * 接入地址 */ private String serviceurl; /** * 命名空间tdc */ private String tdcNamespace; /** * 角色tdc的token */ private String tdcToken; /** * 集群name */ private String cluster; /** * topicMap */ private Map<String, String> topicMap; /** * 订阅 */ private Map<String, String> subMap; /** * 开关 on:Consumer可用 ||||| off:Consumer断路 */ private String onOff; }
import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author: huangyibo * @Date: 2022/5/28 2:33 * @Description: Pulsar 配置类 */ @Configuration @EnableConfigurationProperties(PulsarProperties.class) public class PulsarConfig { @Autowired PulsarProperties pulsarProperties; @Bean public PulsarClient getPulsarClient() { try { return PulsarClient.builder() .authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken())) .serviceUrl(pulsarProperties.getServiceurl()) .build(); } catch (PulsarClientException e) { System.out.println(e); throw new RuntimeException("初始化Pulsar Client失败"); } } }
import com.yibo.pulsar.pojo.User; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageListener; import org.springframework.stereotype.Component; /** * @Author: huangyibo * @Date: 2022/5/28 2:37 * @Description: */ @Component public class UserMessageListener implements MessageListener<User> { @Override public void received(Consumer<User> consumer, Message<User> msg) { try { User user = msg.getValue(); System.out.println(user); consumer.acknowledge(msg); } catch (Exception e) { consumer.negativeAcknowledge(msg); } } } import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageListener; import org.springframework.stereotype.Component; /** * @Author: huangyibo * @Date: 2022/5/28 2:37 * @Description: */ @Component public class StringMessageListener implements MessageListener<String> { @Override public void received(Consumer<String> consumer, Message<String> msg) { try { System.out.println(msg.getValue()); consumer.acknowledge(msg); } catch (Exception e) { consumer.negativeAcknowledge(msg); } } }
위 내용은 Pulsar를 SpringBoot와 통합하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!