Créer un package Controller et écrire une classe de test pour envoyer des messages
package com.my.kafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @GetMapping("hello") public String helloProducer(){ kafkaTemplate.send("my-topic","Hello~"); return "ok"; } }
Écrire un test La classe est utilisée pour recevoir des messages :
package com.my.kafka.listener; import org.junit.platform.commons.util.StringUtils; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class HelloListener { @KafkaListener(topics = "my-topic") public void helloListener(String message) { if(StringUtils.isNotBlank(message)) { System.out.println(message); } } }
Ouvrez le navigateur et entrez localhost:9991/hello, puis accédez à la console pour afficher le message. Vous pouvez voir que le message réussi a été surveillé. et consommé.
Kafka est actuellement intégré à Springboot, car le sérialiseur est StringSerializer, si vous devez transmettre l'objet à ce moment-là, il existe deux manières :
Méthode 1 : Vous pouvez personnaliser le sérialiseur, Il existe de nombreux types d’objets, et cette méthode n’est pas très polyvalente et ne sera pas présentée ici.
Méthode 2 : Vous pouvez convertir l'objet à transférer en chaîne json, puis le convertir en objet après réception du message. Cette méthode est utilisée dans ce projet.
@GetMapping("hello") public String helloProducer(){ User user = new User(); user.setName("赵四"); user.setAge(20); kafkaTemplate.send("my-topic", JSON.toJSONString(user)); return "ok"; }
Vous pouvez voir que les paramètres de l'objet sont reçus avec succès Pour utiliser l'objet ultérieurement, il vous suffit de le convertir en objet Utilisateur.
Après la publication d'un article, il peut y avoir des erreurs ou d'autres raisons dans l'article. Nous mettrons en œuvre la fonction de téléchargement et de suppression de l'article du côté de la gestion des articles (voir le). image ci-dessous), c'est-à-dire que lorsque le côté gestion retire un article des étagères, l'article ne sera plus affiché sur l'extrémité mobile. Ce n'est qu'une fois l'article réinscrit que les informations sur l'article peuvent être vues sur l'extrémité mobile.
Après avoir reçu les paramètres transmis par le front-end, le back-end doit d'abord effectuer une vérification. Si les paramètres ne sont pas vides, l'exécution peut continuer. en fonction de l'identifiant de l'article transmis par le front-end. (ID d'article côté We-media) Interrogez les informations sur l'article de la base de données self-media et déterminez si l'article a été publié, car seuls les articles qui ont été examinés et publiés avec succès peuvent l'être. téléchargé et supprimé. Une fois que le microservice côté auto-média a modifié l'état de téléchargement et de suppression de l'article, il peut envoyer un message à Kafka. Le message est un objet Map. Les données qui y sont stockées sont l'identifiant de l'article du terminal mobile et le paramètre d'activation transmis. depuis le front-end. Bien sûr, ce message doit être envoyé. L'objet Map peut être envoyé après avoir été converti en chaîne JSON.
Le microservice d'article écoute le message envoyé par Kafka, convertit la chaîne JSON en un objet Map, puis obtient les paramètres pertinents pour modifier l'état haut et bas de l'article mobile.
<!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>
package com.my.common.constans; public class WmNewsMessageConstants { public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic"; }
Puisque j'utilise Nacos comme centre d'enregistrement, les informations de configuration peuvent être placées sur Nacos.
(1) Configuration du terminal auto-média
spring: kafka: bootstrap-servers: 4.234.52.122:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
(2) Configuration du terminal mobile
spring: kafka: bootstrap-servers: 4.234.52.122:9092 consumer: group-id: ${spring.application.name}-test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
@Autowired private KafkaTemplate<String,String> kafkaTemplate; /** * 文章下架或上架 * @param id * @param enable * @return */ @Override public ResponseResult downOrUp(Integer id,Integer enable) { log.info("执行文章上下架操作..."); if(id == null || enable == null) { return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //根据id获取文章 WmNews news = getById(id); if(news == null) { return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在"); } //获取当前文章状态 Short status = news.getStatus(); if(!status.equals(WmNews.Status.PUBLISHED.getCode())) { return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非发布状态,不能上下架"); } //更改文章状态 news.setEnable(enable.shortValue()); updateById(news); log.info("更改文章上架状态{}-->{}",status,news.getEnable()); //发送消息到Kafka Map<String, Object> map = new HashMap<>(); map.put("articleId",news.getArticleId()); map.put("enable",enable.shortValue()); kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map)); log.info("发送消息到Kafka..."); return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
package com.my.article.listener; import com.baomidou.mybatisplus.core.toolkit.StringUtils; import com.my.article.service.ApArticleService; import com.my.common.constans.WmNewsMessageConstants; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.kafka.annotation.KafkaListener; @Slf4j @Component public class EnableListener { @Autowired private ApArticleService apArticleService; @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC) public void downOrUp(String message) { if(StringUtils.isNotBlank(message)) { log.info("监听到消息{}",message); apArticleService.downOrUp(message); } } }
(2) Recevez des actualités et modifiez le statut de l'article
/** * 文章上下架 * @param message * @return */ @Override public ResponseResult downOrUp(String message) { Map map = JSON.parseObject(message, Map.class); //获取文章id Long articleId = (Long) map.get("articleId"); //获取文章待修改状态 Integer enable = (Integer) map.get("enable"); //查询文章配置 ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne (Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId)); if(apArticleConfig != null) { //上架 if(enable == 1) { log.info("文章重新上架"); apArticleConfig.setIsDown(false); apArticleConfigMapper.updateById(apArticleConfig); } //下架 if(enable == 0) { log.info("文章下架"); apArticleConfig.setIsDown(true); apArticleConfigMapper.updateById(apArticleConfig); } } else { throw new RuntimeException("文章信息不存在"); } return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!