Erstellen Sie ein Controller-Paket und schreiben Sie eine Testklasse zum Senden von Nachrichten
package com.my.kafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @GetMapping("hello") public String helloProducer(){ kafkaTemplate.send("my-topic","Hello~"); return "ok"; } }
Schreiben Sie eine test Die Klasse wird zum Empfangen von Nachrichten verwendet:
package com.my.kafka.listener; import org.junit.platform.commons.util.StringUtils; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class HelloListener { @KafkaListener(topics = "my-topic") public void helloListener(String message) { if(StringUtils.isNotBlank(message)) { System.out.println(message); } } }
Öffnen Sie den Browser, geben Sie localhost:9991/hello ein und gehen Sie dann zur Konsole, um die Nachricht anzuzeigen. Sie können sehen, dass die erfolgreiche Nachricht überwacht wurde und verzehrt.
Derzeit ist Springboot in Kafka integriert, da der Serializer StringSerializer ist. Wenn Sie das Objekt zu diesem Zeitpunkt übergeben müssen, gibt es zwei Möglichkeiten:
Methode 1: Sie können den Serializer anpassen. Es gibt viele Objekttypen. Diese Methode ist nicht sehr vielseitig und wird hier nicht vorgestellt.
Methode 2: Sie können das zu übertragende Objekt in eine JSON-Zeichenfolge umwandeln und es dann nach Erhalt der Nachricht in ein Objekt umwandeln. Diese Methode wird in diesem Projekt verwendet.
@GetMapping("hello") public String helloProducer(){ User user = new User(); user.setName("赵四"); user.setAge(20); kafkaTemplate.send("my-topic", JSON.toJSONString(user)); return "ok"; }
Sie können sehen, dass die Objektparameter erfolgreich empfangen wurden. Um das Objekt später zu verwenden, müssen Sie es nur in ein Benutzerobjekt konvertieren.
Nach der Veröffentlichung eines Artikels kann es zu Fehlern oder anderen Gründen im Artikel kommen. Wir werden die Funktion zum Hochladen und Entfernen des Artikels auf der Seite der Artikelverwaltung implementieren Das heißt, wenn die Verwaltungsseite einen Artikel aus den Regalen entfernt, wird der Artikel nicht mehr auf der mobilen Seite angezeigt. Erst nachdem der Artikel erneut gelistet wurde, können die Artikelinformationen auf der mobilen Seite angezeigt werden.
Nach Erhalt der vom Frontend übergebenen Parameter muss zunächst eine Überprüfung durchgeführt werden. Wenn die Parameter nicht leer sind, kann die Ausführung fortgesetzt werden Basierend auf der vom Frontend übergebenen Artikel-ID (Artikel-ID der We-Media-Seite). Fragen Sie die Artikelinformationen der Self-Media-Datenbank ab und stellen Sie fest, ob der Artikel veröffentlicht wurde, da nur Artikel veröffentlicht werden können, die erfolgreich überprüft und veröffentlicht wurden hochgeladen und entfernt. Nachdem der Self-Media-Mikrodienst den Upload- und Delisting-Status des Artikels geändert hat, kann er eine Nachricht an Kafka senden. Die darin gespeicherten Daten sind die Artikel-ID des mobilen Endgeräts und der vom Gerät übergebene Aktivierungsparameter Natürlich muss diese Nachricht in einen JSON-String umgewandelt werden, bevor sie gesendet werden kann.
Der Artikel-Mikrodienst hört die von Kafka gesendete Nachricht ab, konvertiert die JSON-Zeichenfolge in ein Kartenobjekt und ruft dann die relevanten Parameter ab, um den Auf- und Ab-Status des mobilen Artikels zu ändern.
<!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>
package com.my.common.constans; public class WmNewsMessageConstants { public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic"; }
Da ich Nacos als Registrierungszentrum verwende, können die Konfigurationsinformationen auf Nacos platziert werden.
(1) Self-Media-Terminal-Konfiguration
spring: kafka: bootstrap-servers: 4.234.52.122:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
(2) Mobile-Terminal-Konfiguration
spring: kafka: bootstrap-servers: 4.234.52.122:9092 consumer: group-id: ${spring.application.name}-test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
@Autowired private KafkaTemplate<String,String> kafkaTemplate; /** * 文章下架或上架 * @param id * @param enable * @return */ @Override public ResponseResult downOrUp(Integer id,Integer enable) { log.info("执行文章上下架操作..."); if(id == null || enable == null) { return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //根据id获取文章 WmNews news = getById(id); if(news == null) { return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在"); } //获取当前文章状态 Short status = news.getStatus(); if(!status.equals(WmNews.Status.PUBLISHED.getCode())) { return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非发布状态,不能上下架"); } //更改文章状态 news.setEnable(enable.shortValue()); updateById(news); log.info("更改文章上架状态{}-->{}",status,news.getEnable()); //发送消息到Kafka Map<String, Object> map = new HashMap<>(); map.put("articleId",news.getArticleId()); map.put("enable",enable.shortValue()); kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map)); log.info("发送消息到Kafka..."); return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
(1) Richten Sie den Listener ein
package com.my.article.listener; import com.baomidou.mybatisplus.core.toolkit.StringUtils; import com.my.article.service.ApArticleService; import com.my.common.constans.WmNewsMessageConstants; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.kafka.annotation.KafkaListener; @Slf4j @Component public class EnableListener { @Autowired private ApArticleService apArticleService; @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC) public void downOrUp(String message) { if(StringUtils.isNotBlank(message)) { log.info("监听到消息{}",message); apArticleService.downOrUp(message); } } }
(2) Erhalten Sie Neuigkeiten und ändern Sie den Artikelstatus
/** * 文章上下架 * @param message * @return */ @Override public ResponseResult downOrUp(String message) { Map map = JSON.parseObject(message, Map.class); //获取文章id Long articleId = (Long) map.get("articleId"); //获取文章待修改状态 Integer enable = (Integer) map.get("enable"); //查询文章配置 ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne (Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId)); if(apArticleConfig != null) { //上架 if(enable == 1) { log.info("文章重新上架"); apArticleConfig.setIsDown(false); apArticleConfigMapper.updateById(apArticleConfig); } //下架 if(enable == 0) { log.info("文章下架"); apArticleConfig.setIsDown(true); apArticleConfigMapper.updateById(apArticleConfig); } } else { throw new RuntimeException("文章信息不存在"); } return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
Das obige ist der detaillierte Inhalt vonWie das Springboot-Microservice-Projekt Kafka integriert, um Funktionen zum Hochladen und Entfernen von Artikeln zu implementieren. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!