>Java >java지도 시간 >Netty를 통합하고 데이터 전송을 위해 Protobuf를 사용하는 SpringBoot 구현 프로세스

Netty를 통합하고 데이터 전송을 위해 Protobuf를 사용하는 SpringBoot 구현 프로세스

不言
不言원래의
2018-09-12 16:20:273363검색

이 기사의 내용은 Netty를 통합하고 Protobuf를 사용하여 데이터 전송을 수행하는 과정에 대한 내용입니다. 도움이 필요한 친구들이 참고할 수 있기를 바랍니다.

머리말

이 글에서는 주로 SpringBoot의 Netty 통합과 데이터 전송을 위한 Protobuf 사용을 소개합니다. Protobuf는 사용법을 간략하게 소개하겠습니다. Netty에 대해서는 이전 글에서 간략하게 소개한 바 있으므로 여기서는 자세히 다루지 않겠습니다.

Protobuf

소개

protocolbuffer(이하 PB)는 Google의 데이터 교환 형식으로, 언어와 플랫폼에 독립적입니다. Google은 java, c#, c++, go, python 등 여러 언어로 구현을 제공합니다. 각 구현에는 해당 언어에 대한 컴파일러와 라이브러리 파일이 포함되어 있습니다. 바이너리 형식이므로 데이터 교환에 xml을 사용하는 것보다 훨씬 빠릅니다. 분산 애플리케이션 간의 데이터 통신이나 이기종 환경에서의 데이터 교환에 사용할 수 있습니다. 효율성과 호환성이 뛰어난 바이너리 데이터 전송 형식으로 네트워크 전송, 구성 파일, 데이터 저장 등 다양한 분야에서 사용할 수 있습니다.

공식 주소: https://github.com/google/protobuf

Usage

여기 사용법은 Java 관련 사용법만 소개합니다.
먼저 전송해야 할 파일을 정의하는 proto 파일을 만들어야 합니다.
예를 들어, 주로 번호, 이름, 나이를 포함하는 사용자 정보를 정의해야 합니다.
그러면 protobuf 파일의 형식은 다음과 같습니다.
Note: 여기서는 proto3가 사용되었습니다. 이미 관련 설명을 작성했으므로 여기서는 자세히 설명하지 않겠습니다. 한 가지 주의할 점은 proto 파일과 생성된 Java 파일의 이름이 동일할 수 없다는 것입니다!

syntax = "proto3";
// 生成的包名
option java_package="com.pancm.protobuf";
//生成的java名
option java_outer_classname = "UserInfo";

message UserMsg {  
      
     // ID  
     int32 id = 1;  
      
    // 姓名  
     string name = 2;  
      
    // 年龄  
      int32 age = 3;  
      
     // 状态 
     int32 state = 4;  
}

파일을 생성한 후 파일과 protoc.exe(Java를 생성하는 소프트웨어)를 넣습니다. 파일)을 E 드라이브에 있는 디렉토리의 protobuf 폴더 아래에 있는 디렉토리의 dos 인터페이스에 protoc.exe --java_out=文件绝对路径名称를 입력합니다.
예:

protoc.exe --java_out=E:\protobuf User.proto

입력 후 Enter를 누르면 동일한 레벨 디렉터리에 생성된 Java 파일이 표시되며, 프로젝트에서 파일이 지정한 경로에 파일을 배치합니다.

참고: 또한 protobuf 파일 소프트웨어와 테스트 protobuf 파일을 이 프로젝트에 통합했으며 직접 얻을 수 있습니다.

Java 파일이 생성된 후, 어떻게 사용하는지 살펴보겠습니다.
여기에 코드를 직접 붙여넣고 코드에 설명을 적어주시면 이해가 더 쉬울 것 같습니다. . .
코드 예:

     // 按照定义的数据结构,创建一个对象  
        UserInfo.UserMsg.Builder userInfo = UserInfo.UserMsg.newBuilder();  
        userInfo.setId(1);
        userInfo.setName("xuwujing");
        userInfo.setAge(18);
        UserInfo.UserMsg userMsg = userInfo.build();  
        // 将数据写到输出流 
        ByteArrayOutputStream output = new ByteArrayOutputStream();  
        userMsg.writeTo(output);  
        // 将数据序列化后发送 
        byte[] byteArray = output.toByteArray();  
        // 接收到流并读取
        ByteArrayInputStream input = new ByteArrayInputStream(byteArray);  
        // 反序列化  
        UserInfo.UserMsg userInfo2 = UserInfo.UserMsg.parseFrom(input);  
        System.out.println("id:" + userInfo2.getId());  
        System.out.println("name:" + userInfo2.getName());  
        System.out.println("age:" + userInfo2.getAge());

참고: protobuf는 바이너리를 통해 전송되므로 해당 인코딩에 주의해야 합니다. protobuf를 사용할 때는 전송의 최대 바이트 길이에도 주의해야 합니다.

출력 결과:

id:1
name:xuwujing
age:18

SpringBoot는 Netty를 통합합니다

참고: 프로젝트를 직접 가져오려면 맨 아래로 바로 이동하여 링크를 통해 프로젝트 코드를 다운로드할 수 있습니다.

개발 준비

환경 요구사항
JDK:: 1.8
Netty:: 4.0 이상(5 제외)
Protobuf: 3.0 이상

에 익숙하지 않은 경우 네티, 너 내 이전 기사 중 일부를 확인할 수 있습니다. 맙소사, 무시해주세요~. ~
주소 : https://blog.csdn.net/column/details/17640.html

먼저 Maven 관련 의존성 :

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <netty.version>4.1.22.Final</netty.version>
        <protobuf.version>3.5.1</protobuf.version>
        <springboot>1.5.9.RELEASE</springboot>
        <fastjson>1.2.41</fastjson>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>


    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>${springboot}</version>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>${springboot}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <version>${springboot}</version>
            <optional>true</optional>
        </dependency>

        
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>${netty.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>${protobuf.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson}</version>
        </dependency>

        
    <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency> 
</dependencies>

해당 Maven 의존성 추가 후 구성에 추가할 것이 없습니다. 예, 당분간은 단지 수신 포트일 뿐입니다.

코드 작성

코드 모듈은 크게 서버와 클라이언트로 나누어집니다.
주로 구현되는 비즈니스 로직:
서버가 성공적으로 시작되면 클라이언트도 성공적으로 시작됩니다. 이때 서버는 클라이언트에게 protobuf 형식으로 메시지를 보내고 클라이언트는 이에 상응하는 응답을 제공합니다. 클라이언트와 서버가 성공적으로 연결되면 클라이언트는 클라이언트가 저장되었음을 서버에 알리기 위해 일정 시간마다 서버에 하트비트 명령을 보냅니다. 클라이언트가 지정된 시간에 정보를 보내지 않으면 서버가 닫힙니다. 서버와의 연결. 클라이언트가 서버에 연결할 수 없으면 다시 연결이 성공할 때까지 가끔씩 다시 연결을 시도합니다!

Server

첫 번째 단계는 서버의 시작 클래스를 작성하고 해당 주석은 다음과 같이 작성됩니다. 음, 여기서는 너무 자세히 설명하지 않겠습니다. 그러나 제가 작성한 이전 Netty 기사에서는 서버가 main 메소드를 통해 직접 시작되었으므로 객체가 직접적으로 새로운 것이라는 점에 유의해야 합니다. SpringBoot와 통합한 후 관리를 위해 Netty를 springBoot에 넘겨야 하므로 여기서는 해당 주석을 사용합니다.
코드는 다음과 같습니다.

@Service("nettyServer")
public class NettyServer {
    private static final int port = 9876; // 设置服务端端口
    private static EventLoopGroup boss = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接
    private static EventLoopGroup work = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接
    private static ServerBootstrap b = new ServerBootstrap();
    
    @Autowired
    private NettyServerFilter nettyServerFilter;
    
    
    public void run() {
        try {
            b.group(boss, work);
            b.channel(NioServerSocketChannel.class);
            b.childHandler(nettyServerFilter); // 设置过滤器
            // 服务器绑定端口监听
            ChannelFuture f = b.bind(port).sync();
            System.out.println("服务端启动成功,端口是:" + port);
            // 监听服务器关闭监听
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 关闭EventLoopGroup,释放掉所有资源包括创建的线程
            work.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}

서버의 메인 클래스를 작성한 후 해당 필터 조건을 설정해 보겠습니다.
여기서 Netty에서 ChannelInitializer 클래스를 상속한 다음 initChannel 메서드를 다시 작성하여 하트비트 시간 제한 설정, 전송 프로토콜 설정 및 해당 비즈니스 구현 클래스와 같은 해당 설정을 추가해야 합니다.
코드는 다음과 같습니다.

    @Component
     public class NettyServerFilter extends ChannelInitializer<SocketChannel> {
    
    @Autowired
    private NettyServerHandler nettyServerHandler;
    
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline ph = ch.pipeline();
      
         //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
         ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
         // 解码和编码,应和客户端一致
         //传输的协议 Protobuf
         ph.addLast(new ProtobufVarint32FrameDecoder());
         ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
         ph.addLast(new ProtobufVarint32LengthFieldPrepender());
         ph.addLast(new ProtobufEncoder());
         
         //业务逻辑实现类
         ph.addLast("nettyServerHandler", nettyServerHandler);
       }
     }

服务相关的设置的代码写完之后,我们再来编写主要的业务代码。
使用Netty编写业务层的代码,我们需要继承ChannelInboundHandlerAdapterSimpleChannelInboundHandler类,在这里顺便说下它们两的区别吧。
继承SimpleChannelInboundHandler类之后,会在接收到数据后会自动release掉数据占用的Bytebuffer资源。并且继承该类需要指定数据格式。
而继承ChannelInboundHandlerAdapter则不会自动释放,需要手动调用ReferenceCountUtil.release()等方法进行释放。继承该类不需要指定数据格式。
所以在这里,个人推荐服务端继承ChannelInboundHandlerAdapter,手动进行释放,防止数据未处理完就自动释放了。而且服务端可能有多个客户端进行连接,并且每一个客户端请求的数据格式都不一致,这时便可以进行相应的处理。
客户端根据情况可以继承SimpleChannelInboundHandler类。好处是直接指定好传输的数据格式,就不需要再进行格式的转换了。

代码如下:

@Service("nettyServerHandler")
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /** 空闲次数 */
    private int idle_count = 1;
    /** 发送次数 */
    private int count = 1;


    /**
     * 建立连接时,发送一条消息
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
        UserInfo.UserMsg userMsg = UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0)
                .build();
        ctx.writeAndFlush(userMsg);
        super.channelActive(ctx);
    }

    /**
     * 超时处理 如果5秒没有接受客户端的心跳,就触发; 如果超过两次,则直接关闭;
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
        if (obj instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) obj;
            if (IdleState.READER_IDLE.equals(event.state())) { // 如果读通道处于空闲状态,说明没有接收到心跳命令
                System.out.println("已经5秒没有接收到客户端的信息了");
                if (idle_count > 1) {
                    System.out.println("关闭这个不活跃的channel");
                    ctx.channel().close();
                }
                idle_count++;
            }
        } else {
            super.userEventTriggered(ctx, obj);
        }
    }

    /**
     * 业务逻辑处理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("第" + count + "次" + ",服务端接受的消息:" + msg);
        try {
            // 如果是protobuf类型的数据
          if (msg instanceof UserMsg) {
                UserInfo.UserMsg userState = (UserInfo.UserMsg) msg;
                if (userState.getState() == 1) {
                    System.out.println("客户端业务处理成功!");
                } else if(userState.getState() == 2){
                    System.out.println("接受到客户端发送的心跳!");
                }else{
                    System.out.println("未知命令!");
                }
            } else {
                System.out.println("未知数据!" + msg);
                return;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ReferenceCountUtil.release(msg);
        }
        count++;
    }

    /**
     * 异常处理
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

还有个服务端的启动类,之前是通过main方法直接启动, 不过这里改成了通过springBoot进行启动,差别不大。
代码如下:

@SpringBootApplication
public class NettyServerApp {

    public static void main(String[] args) {
        // 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件
        ApplicationContext context = SpringApplication.run(NettyServerApp.class, args);
        NettyServer nettyServer = context.getBean(NettyServer.class);
        nettyServer.run();
    }

}

到这里服务端相应的代码就编写完毕了。

客户端

客户端这边的代码和服务端的很多地方都类似,我就不再过多细说了,主要将一些不同的代码拿出来简单的讲述下。
首先是客户端的主类,基本和服务端的差不多,也就是多了监听的端口和一个监听器(用来监听是否和服务端断开连接,用于重连)。
主要实现的代码逻辑如下:

    public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
        ChannelFuture f = null;
        try {
            if (bootstrap != null) {
                bootstrap.group(eventLoopGroup);
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
                bootstrap.handler(nettyClientFilter);
                bootstrap.remoteAddress(host, port);
                f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
                    final EventLoop eventLoop = futureListener.channel().eventLoop();
                    if (!futureListener.isSuccess()) {
                        System.out.println("与服务端断开连接!在10s之后准备尝试重连!");
                        eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
                    }
                });
                if(initFalg){
                    System.out.println("Netty客户端启动成功!");
                    initFalg=false;
                }
                // 阻塞
                f.channel().closeFuture().sync();
            }
        } catch (Exception e) {
            System.out.println("客户端连接失败!"+e.getMessage());
        }
    }

注:监听器这块的实现用的是JDK1.8的写法。

客户端过滤其这块基本和服务端一直。不过需要注意的是,传输协议、编码和解码应该一致,还有心跳的读写时间应该小于服务端所设置的时间。
改动的代码如下:

    ChannelPipeline ph = ch.pipeline();
        /*
         * 解码和编码,应和服务端一致
         * */
        //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
        ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));

客户端的业务代码逻辑。
主要实现的几点逻辑是心跳按时发送以及解析服务发送的protobuf格式的数据。
这里比服务端多个个注解, 该注解Sharable主要是为了多个handler可以被多个channel安全地共享,也就是保证线程安全。
废话就不多说了,代码如下:

    @Service("nettyClientHandler")
    @ChannelHandler.Sharable
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    private NettyClient nettyClient;
    
    /** 循环次数 */
    private int fcount = 1;
    
    /**
     * 建立连接时
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("建立连接时:" + new Date());
        ctx.fireChannelActive();
    }

    /**
     * 关闭连接时
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("关闭连接时:" + new Date());
        final EventLoop eventLoop = ctx.channel().eventLoop();
        nettyClient.doConnect(new Bootstrap(), eventLoop);
        super.channelInactive(ctx);
    }

    /**
     * 心跳请求处理 每4秒发送一次心跳请求;
     * 
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
        System.out.println("循环请求的时间:" + new Date() + ",次数" + fcount);
        if (obj instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) obj;
            if (IdleState.WRITER_IDLE.equals(event.state())) { // 如果写通道处于空闲状态,就发送心跳命令
                UserMsg.Builder userState = UserMsg.newBuilder().setState(2);
                ctx.channel().writeAndFlush(userState);
                fcount++;
            }
        }
    }

    /**
     * 业务逻辑处理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 如果不是protobuf类型的数据
        if (!(msg instanceof UserMsg)) {
            System.out.println("未知数据!" + msg);
            return;
        }
        try {

            // 得到protobuf的数据
            UserInfo.UserMsg userMsg = (UserInfo.UserMsg) msg;
            // 进行相应的业务处理。。。
            // 这里就从简了,只是打印而已
            System.out.println(
                    "客户端接受到的用户信息。编号:" + userMsg.getId() + ",姓名:" + userMsg.getName() + ",年龄:" + userMsg.getAge());

            // 这里返回一个已经接受到数据的状态
            UserMsg.Builder userState = UserMsg.newBuilder().setState(1);
            ctx.writeAndFlush(userState);
            System.out.println("成功发送给服务端!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ReferenceCountUtil.release(msg);
        }
     }
    }

那么到这里客户端的代码也编写完毕了。

功能测试

首先启动服务端,然后再启动客户端。
我们来看看结果是否如上述所说。

服务端输出结果:

服务端启动成功,端口是:9876
连接的客户端地址:/127.0.0.1:53319
第1次,服务端接受的消息:state: 1

客户端业务处理成功!
第2次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第3次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第4次,服务端接受的消息:state: 2

接受到客户端发送的心跳!

客户端输入结果:

Netty客户端启动成功!
建立连接时:Mon Jul 16 23:31:58 CST 2018
客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
成功发送给服务端!
循环请求的时间:Mon Jul 16 23:32:02 CST 2018,次数1
循环请求的时间:Mon Jul 16 23:32:06 CST 2018,次数2
循环请求的时间:Mon Jul 16 23:32:10 CST 2018,次数3
循环请求的时间:Mon Jul 16 23:32:14 CST 2018,次数4

通过打印信息可以看出如上述所说。

接下来我们再来看看客户端是否能够实现重连。
先启动客户端,再启动服务端。

客户端输入结果:

Netty客户端启动成功!
与服务端断开连接!在10s之后准备尝试重连!
客户端连接失败!AbstractChannel$CloseFuture@1fbaa3ac(incomplete)
建立连接时:Mon Jul 16 23:41:33 CST 2018
客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
成功发送给服务端!
循环请求的时间:Mon Jul 16 23:41:38 CST 2018,次数1
循环请求的时间:Mon Jul 16 23:41:42 CST 2018,次数2
循环请求的时间:Mon Jul 16 23:41:46 CST 2018,次数3

服务端输出结果:

服务端启动成功,端口是:9876
连接的客户端地址:/127.0.0.1:53492
第1次,服务端接受的消息:state: 1

客户端业务处理成功!
第2次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第3次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第4次,服务端接受的消息:state: 2

结果也如上述所说!

关于SpringBoot整合Netty使用Protobuf进行数据传输到这里就结束了。
SpringBoot整合Netty使用Protobuf进行数据传输的项目工程地址:
https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf

对了,也有不使用springBoot整合的Netty项目工程地址:
https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf

相关推荐: 

SpringBoot动态管理定时任务的实现代码

使用Spring Cloud Netflix Zuul代理网关访问后台REST服务的实现(代码)

위 내용은 Netty를 통합하고 데이터 전송을 위해 Protobuf를 사용하는 SpringBoot 구현 프로세스의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.