Maison  >  Article  >  Java  >  Comment le projet de microservice Springboot intègre Kafka pour implémenter les fonctions de téléchargement et de suppression d'articles

Comment le projet de microservice Springboot intègre Kafka pour implémenter les fonctions de téléchargement et de suppression d'articles

WBOY
WBOYavant
2023-05-16 20:52:04974parcourir

Un : Démarrage rapide de l'envoi de messages Kafka

1. Passer un message de chaîne

(1) Envoyer un message

Créer un package Controller et écrire une classe de test pour envoyer des messages

package com.my.kafka.controller;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
public class HelloController {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
 
    @GetMapping("hello")
    public String helloProducer(){
        kafkaTemplate.send("my-topic","Hello~");
        return "ok";
    }
}
(2) Écouter les messages

Écrire un test La classe est utilisée pour recevoir des messages :

package com.my.kafka.listener;
 
import org.junit.platform.commons.util.StringUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
@Component
public class HelloListener {
    @KafkaListener(topics = "my-topic")
    public void helloListener(String message) {
        if(StringUtils.isNotBlank(message)) {
            System.out.println(message);
        }
    }
}
(3) Résultats du test

Ouvrez le navigateur et entrez localhost:9991/hello, puis accédez à la console pour afficher le message. Vous pouvez voir que le message réussi a été surveillé. et consommé.

Comment le projet de microservice Springboot intègre Kafka pour implémenter les fonctions de téléchargement et de suppression darticles

2. Passer des messages d'objet

Kafka est actuellement intégré à Springboot, car le sérialiseur est StringSerializer, si vous devez transmettre l'objet à ce moment-là, il existe deux manières :

Méthode 1 : Vous pouvez personnaliser le sérialiseur, Il existe de nombreux types d’objets, et cette méthode n’est pas très polyvalente et ne sera pas présentée ici.

Méthode 2 : Vous pouvez convertir l'objet à transférer en chaîne json, puis le convertir en objet après réception du message. Cette méthode est utilisée dans ce projet.

(1) Modifier le code producteur
@GetMapping("hello")
public String helloProducer(){
    User user = new User();
    user.setName("赵四");
    user.setAge(20);
    kafkaTemplate.send("my-topic", JSON.toJSONString(user));
    return "ok";
}
(2) Résultat du test

Comment le projet de microservice Springboot intègre Kafka pour implémenter les fonctions de téléchargement et de suppression darticles

Vous pouvez voir que les paramètres de l'objet sont reçus avec succès Pour utiliser l'objet ultérieurement, il vous suffit de le convertir en objet Utilisateur.

Deux : Introduction à la fonction

1. Analyse des exigences

Après la publication d'un article, il peut y avoir des erreurs ou d'autres raisons dans l'article. Nous mettrons en œuvre la fonction de téléchargement et de suppression de l'article du côté de la gestion des articles (voir le). image ci-dessous), c'est-à-dire que lorsque le côté gestion retire un article des étagères, l'article ne sera plus affiché sur l'extrémité mobile. Ce n'est qu'une fois l'article réinscrit que les informations sur l'article peuvent être vues sur l'extrémité mobile.

Comment le projet de microservice Springboot intègre Kafka pour implémenter les fonctions de téléchargement et de suppression darticles

2. Analyse logique

Comment le projet de microservice Springboot intègre Kafka pour implémenter les fonctions de téléchargement et de suppression darticles

Après avoir reçu les paramètres transmis par le front-end, le back-end doit d'abord effectuer une vérification. Si les paramètres ne sont pas vides, l'exécution peut continuer. en fonction de l'identifiant de l'article transmis par le front-end. (ID d'article côté We-media) Interrogez les informations sur l'article de la base de données self-media et déterminez si l'article a été publié, car seuls les articles qui ont été examinés et publiés avec succès peuvent l'être. téléchargé et supprimé. Une fois que le microservice côté auto-média a modifié l'état de téléchargement et de suppression de l'article, il peut envoyer un message à Kafka. Le message est un objet Map. Les données qui y sont stockées sont l'identifiant de l'article du terminal mobile et le paramètre d'activation transmis. depuis le front-end. Bien sûr, ce message doit être envoyé. L'objet Map peut être envoyé après avoir été converti en chaîne JSON.

Le microservice d'article écoute le message envoyé par Kafka, convertit la chaîne JSON en un objet Map, puis obtient les paramètres pertinents pour modifier l'état haut et bas de l'article mobile.

Trois : Préparation

1. Introduire les dépendances

<!-- kafkfa -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

2 Définir les constantes

package com.my.common.constans;
public class WmNewsMessageConstants {
    public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}

3.Informations de configuration Kafka

Puisque j'utilise Nacos comme centre d'enregistrement, les informations de configuration peuvent être placées sur Nacos.

(1) Configuration du terminal auto-média

spring:
  kafka:
    bootstrap-servers: 4.234.52.122:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

(2) Configuration du terminal mobile

spring:
  kafka:
    bootstrap-servers: 4.234.52.122:9092
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Quatre : Implémentation du code

1 Terminal auto-média

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
 * 文章下架或上架
 * @param id
 * @param enable
 * @return
 */
@Override
public ResponseResult downOrUp(Integer id,Integer enable) {
    log.info("执行文章上下架操作...");
    if(id == null || enable == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }
    //根据id获取文章
    WmNews news = getById(id);
    if(news == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在");
    }
    //获取当前文章状态
    Short status = news.getStatus();
    if(!status.equals(WmNews.Status.PUBLISHED.getCode())) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非发布状态,不能上下架");
    }
 
    //更改文章状态
    news.setEnable(enable.shortValue());
    updateById(news);
    log.info("更改文章上架状态{}-->{}",status,news.getEnable());
 
    //发送消息到Kafka
    Map<String, Object> map = new HashMap<>();
    map.put("articleId",news.getArticleId());
    map.put("enable",enable.shortValue());
    kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
    log.info("发送消息到Kafka...");
 
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

2.

package com.my.article.listener;
 
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.my.article.service.ApArticleService;
import com.my.common.constans.WmNewsMessageConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;
 
 
@Slf4j
@Component
public class EnableListener {
    @Autowired
    private ApArticleService apArticleService;
 
    @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
    public void downOrUp(String message) {
        if(StringUtils.isNotBlank(message)) {
            log.info("监听到消息{}",message);
            apArticleService.downOrUp(message);
        }
    }
}

(2) Recevez des actualités et modifiez le statut de l'article

/**
* 文章上下架
* @param message
* @return
*/
@Override
public ResponseResult downOrUp(String message) {
    Map map = JSON.parseObject(message, Map.class);
    //获取文章id
    Long articleId = (Long) map.get("articleId");
    //获取文章待修改状态
    Integer enable = (Integer) map.get("enable");
    //查询文章配置
    ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne
            (Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId));
    if(apArticleConfig != null) {
        //上架
        if(enable == 1) {
            log.info("文章重新上架");
            apArticleConfig.setIsDown(false);
            apArticleConfigMapper.updateById(apArticleConfig);
        }
        //下架
        if(enable == 0) {
            log.info("文章下架");
            apArticleConfig.setIsDown(true);
            apArticleConfigMapper.updateById(apArticleConfig);
        }
    }
    else {
        throw new RuntimeException("文章信息不存在");
    }
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer