ホームページ  >  記事  >  Java  >  Springboot マイクロサービス プロジェクトが Kafka を統合して記事のアップロード機能とリストから削除する機能を実装する方法

Springboot マイクロサービス プロジェクトが Kafka を統合して記事のアップロード機能とリストから削除する機能を実装する方法

WBOY
WBOY転載
2023-05-16 20:52:041011ブラウズ

1: Kafka メッセージ送信のクイック スタート

1. 文字列メッセージを渡す

(1) メッセージの送信

コントローラー パッケージを作成し、送信用のテスト クラスを作成します。メッセージ

package com.my.kafka.controller;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
public class HelloController {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
 
    @GetMapping("hello")
    public String helloProducer(){
        kafkaTemplate.send("my-topic","Hello~");
        return "ok";
    }
}
(2) メッセージをリッスンする

メッセージを受信するテスト クラスを作成します:

package com.my.kafka.listener;
 
import org.junit.platform.commons.util.StringUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
@Component
public class HelloListener {
    @KafkaListener(topics = "my-topic")
    public void helloListener(String message) {
        if(StringUtils.isNotBlank(message)) {
            System.out.println(message);
        }
    }
}
(3) テスト結果

ブラウザ入力 localhost を開きます:9991/hello, and then go to the console to view the message. 成功したメッセージが監視され、消費されていることがわかります。

Springboot マイクロサービス プロジェクトが Kafka を統合して記事のアップロード機能とリストから削除する機能を実装する方法

2. オブジェクト メッセージの受け渡し

現在 Springboot には Kafka が統合されており、シリアライザーは StringSerializer であるため、この時点でオブジェクトを渡す必要がある場合は、次の 2 つがあります。方法 方法:

方法 1: 多くのオブジェクト タイプでシリアライザーをカスタマイズできますが、この方法はあまり汎用性が高くないため、ここでは紹介しません。

方法2: 転送するオブジェクトをjson文字列に変換し、メッセージ受信後にオブジェクトに変換する方法 本プロジェクトではこの方法を使用します。

(1) プロデューサコードを変更します
@GetMapping("hello")
public String helloProducer(){
    User user = new User();
    user.setName("赵四");
    user.setAge(20);
    kafkaTemplate.send("my-topic", JSON.toJSONString(user));
    return "ok";
}
(2) 結果テスト

Springboot マイクロサービス プロジェクトが Kafka を統合して記事のアップロード機能とリストから削除する機能を実装する方法

すべてのオブジェクトパラメータが正常に受信されたことがわかります。 , 後で このオブジェクトを使用するには、それを User オブジェクトに変換するだけです。

2: 機能紹介

1. 要件分析

記事公開後、記事に誤り等がある場合がございます。記事管理側のアップロード・削除機能(下図参照)、つまり管理端末が商品棚から商品を削除すると、モバイル端末にはその商品が表示されなくなり、再出品して初めて商品が表示されます。記事情報をモバイル端末でもご覧いただけます。

Springboot マイクロサービス プロジェクトが Kafka を統合して記事のアップロード機能とリストから削除する機能を実装する方法

#2. 論理分析

Springboot マイクロサービス プロジェクトが Kafka を統合して記事のアップロード機能とリストから削除する機能を実装する方法

#フロントエンドから渡されたパラメータをバックエンドが受け取った後、最初に検証を行う必要があります。実行を続行する前にパラメータが空ではありません。まず、フロントエンドから渡された記事 ID (セルフメディア エンド記事 ID) に基づいてセルフメディア データベースの記事情報をクエリする必要があります。レビューのみが成功し成功するため、記事が公開されたかどうかを判断します。アップロードまたは削除できるのは公開された記事のみです。セルフメディア側のマイクロサービスが記事のアップロードとリスト解除のステータスを変更した後、Kafka にメッセージを送信できます。メッセージは Map オブジェクトです。そこに格納されるデータは、モバイル端末の記事 ID と渡されたenable パラメータです。もちろん、このメッセージは、Map オブジェクトを JSON 文字列に変換して送信できます。

記事マイクロサービスは、Kafka によって送信されたメッセージをリッスンし、JSON 文字列を Map オブジェクトに変換し、関連するパラメーターを取得して、モバイル記事のアップとダウンのステータスを変更します。

3: 早期の準備

1.依存関係の導入

<!-- kafkfa -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

2.定数の定義

package com.my.common.constans;
public class WmNewsMessageConstants {
    public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}

3.Kafka設定情報

理由Nacos を登録センターとして使用しているため、構成情報は Nacos に置くことができます。

(1) セルフメディア端末の構成

spring:
  kafka:
    bootstrap-servers: 4.234.52.122:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

(2) モバイル端末の構成

spring:
  kafka:
    bootstrap-servers: 4.234.52.122:9092
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

4: コード実装

1. セルフメディア端末

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
 * 文章下架或上架
 * @param id
 * @param enable
 * @return
 */
@Override
public ResponseResult downOrUp(Integer id,Integer enable) {
    log.info("执行文章上下架操作...");
    if(id == null || enable == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }
    //根据id获取文章
    WmNews news = getById(id);
    if(news == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在");
    }
    //获取当前文章状态
    Short status = news.getStatus();
    if(!status.equals(WmNews.Status.PUBLISHED.getCode())) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非发布状态,不能上下架");
    }
 
    //更改文章状态
    news.setEnable(enable.shortValue());
    updateById(news);
    log.info("更改文章上架状态{}-->{}",status,news.getEnable());
 
    //发送消息到Kafka
    Map<String, Object> map = new HashMap<>();
    map.put("articleId",news.getArticleId());
    map.put("enable",enable.shortValue());
    kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
    log.info("发送消息到Kafka...");
 
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

2. モバイル端末

(1) リスナーの設定

package com.my.article.listener;
 
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.my.article.service.ApArticleService;
import com.my.common.constans.WmNewsMessageConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;
 
 
@Slf4j
@Component
public class EnableListener {
    @Autowired
    private ApArticleService apArticleService;
 
    @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
    public void downOrUp(String message) {
        if(StringUtils.isNotBlank(message)) {
            log.info("监听到消息{}",message);
            apArticleService.downOrUp(message);
        }
    }
}

(2) メッセージの取得と記事ステータスの変更

/**
* 文章上下架
* @param message
* @return
*/
@Override
public ResponseResult downOrUp(String message) {
    Map map = JSON.parseObject(message, Map.class);
    //获取文章id
    Long articleId = (Long) map.get("articleId");
    //获取文章待修改状态
    Integer enable = (Integer) map.get("enable");
    //查询文章配置
    ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne
            (Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId));
    if(apArticleConfig != null) {
        //上架
        if(enable == 1) {
            log.info("文章重新上架");
            apArticleConfig.setIsDown(false);
            apArticleConfigMapper.updateById(apArticleConfig);
        }
        //下架
        if(enable == 0) {
            log.info("文章下架");
            apArticleConfig.setIsDown(true);
            apArticleConfigMapper.updateById(apArticleConfig);
        }
    }
    else {
        throw new RuntimeException("文章信息不存在");
    }
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

以上がSpringboot マイクロサービス プロジェクトが Kafka を統合して記事のアップロード機能とリストから削除する機能を実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。