Heim  >  Artikel  >  Java  >  Wie das Springboot-Microservice-Projekt Kafka integriert, um Funktionen zum Hochladen und Entfernen von Artikeln zu implementieren

Wie das Springboot-Microservice-Projekt Kafka integriert, um Funktionen zum Hochladen und Entfernen von Artikeln zu implementieren

WBOY
WBOYnach vorne
2023-05-16 20:52:041008Durchsuche

Eins: Schnellstart des Kafka-Nachrichtenversands

1. Übergeben Sie eine Zeichenfolgennachricht

(1) Senden Sie eine Nachricht

Erstellen Sie ein Controller-Paket und schreiben Sie eine Testklasse zum Senden von Nachrichten

package com.my.kafka.controller;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
public class HelloController {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
 
    @GetMapping("hello")
    public String helloProducer(){
        kafkaTemplate.send("my-topic","Hello~");
        return "ok";
    }
}
(2) Warten Sie auf Nachrichten

Schreiben Sie eine test Die Klasse wird zum Empfangen von Nachrichten verwendet:

package com.my.kafka.listener;
 
import org.junit.platform.commons.util.StringUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
@Component
public class HelloListener {
    @KafkaListener(topics = "my-topic")
    public void helloListener(String message) {
        if(StringUtils.isNotBlank(message)) {
            System.out.println(message);
        }
    }
}
(3) Testergebnisse

Öffnen Sie den Browser, geben Sie localhost:9991/hello ein und gehen Sie dann zur Konsole, um die Nachricht anzuzeigen. Sie können sehen, dass die erfolgreiche Nachricht überwacht wurde und verzehrt.

Wie das Springboot-Microservice-Projekt Kafka integriert, um Funktionen zum Hochladen und Entfernen von Artikeln zu implementieren

2. Übergeben von Objektnachrichten

Derzeit ist Springboot in Kafka integriert, da der Serializer StringSerializer ist. Wenn Sie das Objekt zu diesem Zeitpunkt übergeben müssen, gibt es zwei Möglichkeiten:

Methode 1: Sie können den Serializer anpassen. Es gibt viele Objekttypen. Diese Methode ist nicht sehr vielseitig und wird hier nicht vorgestellt.

Methode 2: Sie können das zu übertragende Objekt in eine JSON-Zeichenfolge umwandeln und es dann nach Erhalt der Nachricht in ein Objekt umwandeln. Diese Methode wird in diesem Projekt verwendet.

(1) Ändern Sie den Produzentencode
@GetMapping("hello")
public String helloProducer(){
    User user = new User();
    user.setName("赵四");
    user.setAge(20);
    kafkaTemplate.send("my-topic", JSON.toJSONString(user));
    return "ok";
}
(2) Ergebnistest

Wie das Springboot-Microservice-Projekt Kafka integriert, um Funktionen zum Hochladen und Entfernen von Artikeln zu implementieren

Sie können sehen, dass die Objektparameter erfolgreich empfangen wurden. Um das Objekt später zu verwenden, müssen Sie es nur in ein Benutzerobjekt konvertieren.

Zweitens: Funktionseinführung

1. Anforderungsanalyse

Nach der Veröffentlichung eines Artikels kann es zu Fehlern oder anderen Gründen im Artikel kommen. Wir werden die Funktion zum Hochladen und Entfernen des Artikels auf der Seite der Artikelverwaltung implementieren Das heißt, wenn die Verwaltungsseite einen Artikel aus den Regalen entfernt, wird der Artikel nicht mehr auf der mobilen Seite angezeigt. Erst nachdem der Artikel erneut gelistet wurde, können die Artikelinformationen auf der mobilen Seite angezeigt werden.

Wie das Springboot-Microservice-Projekt Kafka integriert, um Funktionen zum Hochladen und Entfernen von Artikeln zu implementieren

2. Logische Analyse

Wie das Springboot-Microservice-Projekt Kafka integriert, um Funktionen zum Hochladen und Entfernen von Artikeln zu implementieren

Nach Erhalt der vom Frontend übergebenen Parameter muss zunächst eine Überprüfung durchgeführt werden. Wenn die Parameter nicht leer sind, kann die Ausführung fortgesetzt werden Basierend auf der vom Frontend übergebenen Artikel-ID (Artikel-ID der We-Media-Seite). Fragen Sie die Artikelinformationen der Self-Media-Datenbank ab und stellen Sie fest, ob der Artikel veröffentlicht wurde, da nur Artikel veröffentlicht werden können, die erfolgreich überprüft und veröffentlicht wurden hochgeladen und entfernt. Nachdem der Self-Media-Mikrodienst den Upload- und Delisting-Status des Artikels geändert hat, kann er eine Nachricht an Kafka senden. Die darin gespeicherten Daten sind die Artikel-ID des mobilen Endgeräts und der vom Gerät übergebene Aktivierungsparameter Natürlich muss diese Nachricht in einen JSON-String umgewandelt werden, bevor sie gesendet werden kann.

Der Artikel-Mikrodienst hört die von Kafka gesendete Nachricht ab, konvertiert die JSON-Zeichenfolge in ein Kartenobjekt und ruft dann die relevanten Parameter ab, um den Auf- und Ab-Status des mobilen Artikels zu ändern.

Drei: Vorbereitung

1. Abhängigkeiten einführen

<!-- kafkfa -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

2. Konstanten definieren

package com.my.common.constans;
public class WmNewsMessageConstants {
    public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}

3.Kafka-Konfigurationsinformationen

Da ich Nacos als Registrierungszentrum verwende, können die Konfigurationsinformationen auf Nacos platziert werden.

(1) Self-Media-Terminal-Konfiguration

spring:
  kafka:
    bootstrap-servers: 4.234.52.122:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

(2) Mobile-Terminal-Konfiguration

spring:
  kafka:
    bootstrap-servers: 4.234.52.122:9092
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Vier: Code-Implementierung

1. Self-Media-Terminal

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
 * 文章下架或上架
 * @param id
 * @param enable
 * @return
 */
@Override
public ResponseResult downOrUp(Integer id,Integer enable) {
    log.info("执行文章上下架操作...");
    if(id == null || enable == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }
    //根据id获取文章
    WmNews news = getById(id);
    if(news == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在");
    }
    //获取当前文章状态
    Short status = news.getStatus();
    if(!status.equals(WmNews.Status.PUBLISHED.getCode())) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非发布状态,不能上下架");
    }
 
    //更改文章状态
    news.setEnable(enable.shortValue());
    updateById(news);
    log.info("更改文章上架状态{}-->{}",status,news.getEnable());
 
    //发送消息到Kafka
    Map<String, Object> map = new HashMap<>();
    map.put("articleId",news.getArticleId());
    map.put("enable",enable.shortValue());
    kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
    log.info("发送消息到Kafka...");
 
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

2. Mobiles Terminal

(1) Richten Sie den Listener ein

package com.my.article.listener;
 
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.my.article.service.ApArticleService;
import com.my.common.constans.WmNewsMessageConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;
 
 
@Slf4j
@Component
public class EnableListener {
    @Autowired
    private ApArticleService apArticleService;
 
    @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
    public void downOrUp(String message) {
        if(StringUtils.isNotBlank(message)) {
            log.info("监听到消息{}",message);
            apArticleService.downOrUp(message);
        }
    }
}

(2) Erhalten Sie Neuigkeiten und ändern Sie den Artikelstatus

/**
* 文章上下架
* @param message
* @return
*/
@Override
public ResponseResult downOrUp(String message) {
    Map map = JSON.parseObject(message, Map.class);
    //获取文章id
    Long articleId = (Long) map.get("articleId");
    //获取文章待修改状态
    Integer enable = (Integer) map.get("enable");
    //查询文章配置
    ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne
            (Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId));
    if(apArticleConfig != null) {
        //上架
        if(enable == 1) {
            log.info("文章重新上架");
            apArticleConfig.setIsDown(false);
            apArticleConfigMapper.updateById(apArticleConfig);
        }
        //下架
        if(enable == 0) {
            log.info("文章下架");
            apArticleConfig.setIsDown(true);
            apArticleConfigMapper.updateById(apArticleConfig);
        }
    }
    else {
        throw new RuntimeException("文章信息不存在");
    }
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

Das obige ist der detaillierte Inhalt vonWie das Springboot-Microservice-Projekt Kafka integriert, um Funktionen zum Hochladen und Entfernen von Artikeln zu implementieren. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen