>  기사  >  Java  >  Springboot 마이크로서비스 프로젝트가 Kafka를 통합하여 기사 업로드 및 목록 삭제 기능을 구현하는 방법

Springboot 마이크로서비스 프로젝트가 Kafka를 통합하여 기사 업로드 및 목록 삭제 기능을 구현하는 방법

WBOY
WBOY앞으로
2023-05-16 20:52:04975검색

One: Kafka 메시지 보내기 빠른 시작

1. 문자열 메시지 전달

(1) 메시지 보내기

컨트롤러 패키지 만들기 및 메시지 보내기를 위한 테스트 클래스 작성

package com.my.kafka.controller;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
public class HelloController {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
 
    @GetMapping("hello")
    public String helloProducer(){
        kafkaTemplate.send("my-topic","Hello~");
        return "ok";
    }
}
(2) 메시지 듣기

쓰기 test 클래스는 메시지를 수신하는 데 사용됩니다.

package com.my.kafka.listener;
 
import org.junit.platform.commons.util.StringUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
@Component
public class HelloListener {
    @KafkaListener(topics = "my-topic")
    public void helloListener(String message) {
        if(StringUtils.isNotBlank(message)) {
            System.out.println(message);
        }
    }
}
(3) 테스트 결과

브라우저를 열고 localhost:9991/hello를 입력한 후 콘솔로 이동하여 메시지를 보면 성공적인 메시지가 모니터링되는 것을 볼 수 있습니다. 그리고 소비되었습니다.

Springboot 마이크로서비스 프로젝트가 Kafka를 통합하여 기사 업로드 및 목록 삭제 기능을 구현하는 방법

2. 개체 메시지 전달

현재 springboot 통합 kafka는 직렬 변환기가 StringSerializer이기 때문에 이때 개체를 전달해야 하는 경우 두 가지 방법이 있습니다.

방법 1: 직렬 변환기를 사용자 정의할 수 있습니다. 객체 유형은 다양하며 이 방법은 그다지 다양하지 않으므로 여기서는 소개하지 않습니다.

방법 2: 전송할 객체를 json 문자열로 변환한 후 메시지를 받은 후 객체로 변환하는 방법이 이 프로젝트에서 사용됩니다.

(1) 생산자 코드 수정
@GetMapping("hello")
public String helloProducer(){
    User user = new User();
    user.setName("赵四");
    user.setAge(20);
    kafkaTemplate.send("my-topic", JSON.toJSONString(user));
    return "ok";
}
(2) 결과 테스트

Springboot 마이크로서비스 프로젝트가 Kafka를 통합하여 기사 업로드 및 목록 삭제 기능을 구현하는 방법

객체 매개변수를 성공적으로 수신한 것을 볼 수 있으며, 나중에 객체를 사용하려면 User 객체로 변환하기만 하면 됩니다.

둘: 기능 소개

1. 요구사항 분석

글을 게시한 후, 글에 오류나 기타 사유가 있을 수 있습니다. 글 관리 측면에서 글의 업로드 및 삭제 기능을 구현하겠습니다. 아래 그림), 즉, 관리 측이 서가에서 기사를 제거하면 기사가 더 이상 모바일 측에 표시되지 않습니다. 기사가 다시 나열된 후에야 모바일 측에서 기사 정보를 볼 수 있습니다.

Springboot 마이크로서비스 프로젝트가 Kafka를 통합하여 기사 업로드 및 목록 삭제 기능을 구현하는 방법

2. 논리적 분석

Springboot 마이크로서비스 프로젝트가 Kafka를 통합하여 기사 업로드 및 목록 삭제 기능을 구현하는 방법

백엔드에서 전달된 매개변수를 받은 후 먼저 매개변수가 비어 있지 않으면 실행을 계속해야 합니다. (위미디어측 기사 ID) 프론트엔드에서 전달한 기사 ID를 기준으로 자체 미디어 데이터베이스의 기사 정보를 조회하여 기사 게재 여부를 판단합니다. 업로드 및 삭제되었습니다. 셀프 미디어 측 마이크로서비스는 기사 업 및 다운 상태를 수정한 후 Kafka에 메시지를 보낼 수 있습니다. 메시지는 Map 객체이며 여기에 저장된 데이터는 모바일 단말기의 업 및 다운 매개변수입니다. 물론 이 메시지는 프런트엔드에서 전달되어야 합니다. Map 객체는 JSON 문자열로 변환된 후 전송될 수 있습니다.

기사 마이크로서비스는 Kafka가 보낸 메시지를 수신하고 JSON 문자열을 Map 객체로 변환한 다음 관련 매개변수를 가져와 모바일 기사의 업 및 다운 상태를 수정합니다.

Three: 준비

1. 종속성 소개

<!-- kafkfa -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

2. 상수 정의

package com.my.common.constans;
public class WmNewsMessageConstants {
    public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}

3.Kafka 구성 정보

Nacos를 등록 센터로 사용하므로 구성 정보는 Nacos에 배치할 수 있습니다.

(1) 셀프 미디어 단말기 구성

spring:
  kafka:
    bootstrap-servers: 4.234.52.122:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

(2) 모바일 단말기 구성

spring:
  kafka:
    bootstrap-servers: 4.234.52.122:9092
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

4: 코드 구현

1. 셀프 미디어 단말기

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
 * 文章下架或上架
 * @param id
 * @param enable
 * @return
 */
@Override
public ResponseResult downOrUp(Integer id,Integer enable) {
    log.info("执行文章上下架操作...");
    if(id == null || enable == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }
    //根据id获取文章
    WmNews news = getById(id);
    if(news == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在");
    }
    //获取当前文章状态
    Short status = news.getStatus();
    if(!status.equals(WmNews.Status.PUBLISHED.getCode())) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非发布状态,不能上下架");
    }
 
    //更改文章状态
    news.setEnable(enable.shortValue());
    updateById(news);
    log.info("更改文章上架状态{}-->{}",status,news.getEnable());
 
    //发送消息到Kafka
    Map<String, Object> map = new HashMap<>();
    map.put("articleId",news.getArticleId());
    map.put("enable",enable.shortValue());
    kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
    log.info("发送消息到Kafka...");
 
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

2. 리스너를 설정합니다.

package com.my.article.listener;
 
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.my.article.service.ApArticleService;
import com.my.common.constans.WmNewsMessageConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;
 
 
@Slf4j
@Component
public class EnableListener {
    @Autowired
    private ApArticleService apArticleService;
 
    @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
    public void downOrUp(String message) {
        if(StringUtils.isNotBlank(message)) {
            log.info("监听到消息{}",message);
            apArticleService.downOrUp(message);
        }
    }
}

(2) 뉴스 받기 및 기사 상태 수정

/**
* 文章上下架
* @param message
* @return
*/
@Override
public ResponseResult downOrUp(String message) {
    Map map = JSON.parseObject(message, Map.class);
    //获取文章id
    Long articleId = (Long) map.get("articleId");
    //获取文章待修改状态
    Integer enable = (Integer) map.get("enable");
    //查询文章配置
    ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne
            (Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId));
    if(apArticleConfig != null) {
        //上架
        if(enable == 1) {
            log.info("文章重新上架");
            apArticleConfig.setIsDown(false);
            apArticleConfigMapper.updateById(apArticleConfig);
        }
        //下架
        if(enable == 0) {
            log.info("文章下架");
            apArticleConfig.setIsDown(true);
            apArticleConfigMapper.updateById(apArticleConfig);
        }
    }
    else {
        throw new RuntimeException("文章信息不存在");
    }
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

위 내용은 Springboot 마이크로서비스 프로젝트가 Kafka를 통합하여 기사 업로드 및 목록 삭제 기능을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제