首頁 >Java >java教程 >SpringBoot整合Netty並使用Protobuf進行資料傳輸的實作過程

SpringBoot整合Netty並使用Protobuf進行資料傳輸的實作過程

不言
不言原創
2018-09-12 16:20:273318瀏覽

這篇文章帶給大家的內容是關於SpringBoot整合Netty並使用Protobuf進行資料傳輸的實現過程,有一定的參考價值,有需要的朋友可以參考一下,希望對你有所幫助。

前言

本篇文章主要介紹的是SpringBoot整合Netty以及使用Protobuf進行資料傳輸的相關內容。 Protobuf會簡單的介紹下用法,至於Netty在之前的文章中已經簡單的介紹過了,這裡就不再過多細說了。

Protobuf

介紹

protocolbuffer(以下簡稱PB)是google 的一種資料交換的格式,它獨立於語言,獨立於平台。 google 提供了多種語言的實作:java、c#、c 、go 和python,每一種實作都包含了對應語言的編譯器以及函式庫檔案。由於它是一種二進制的格式,比使用 xml進行資料交換快許多。可以把它用於分散式應用之間的資料通訊或異質環境下的資料交換。作為一種效率和相容性都很優秀的二進位資料傳輸格式,可以用於諸如網路傳輸、設定檔、資料儲存等許多領域。

官方網址:https://github.com/google/protobuf

使用

這裡的使用就只介紹Java相關的使用。
首先我們需要建立一個proto文件,在該文件定義我們需要傳輸的文件。
例如我們需要定義一個使用者的信息,包含的欄位主要有編號、名稱、年齡。
那麼該protobuf檔案的格式如下:
:這裡使用的是proto3,相關的註解我已經寫了,這裡便不再過多講述了。需要注意一點的是proto檔案和產生的Java檔案名稱不能一致!

syntax = "proto3";
// 生成的包名
option java_package="com.pancm.protobuf";
//生成的java名
option java_outer_classname = "UserInfo";

message UserMsg {  
      
     // ID  
     int32 id = 1;  
      
    // 姓名  
     string name = 2;  
      
    // 年龄  
      int32 age = 3;  
      
     // 状态 
     int32 state = 4;  
}

建立好該檔案之後,我們把該檔案和protoc .exe(產生Java檔案的軟體)放到E碟目錄下的protobuf資料夾下,然後到該目錄的dos介面下輸入:protoc.exe --java_out=檔案絕對路徑名稱
例如:

protoc.exe --java_out=E:\protobuf User.proto

輸入完後,回車即可在同級目錄看到已經產生好的Java文件,然後將該文件放到專案中該文件指定的路徑下即可。

註:產生protobuf的檔案軟體和測試的protobuf檔案我也整合到該專案中了,可以直接取得的。

Java檔案產生好之後,我們再來看怎麼使用。
這裡我就直接貼程式碼了,並且將註解寫在程式碼中,應該更容易理解些。 。 。
程式碼範例:

     // 按照定义的数据结构,创建一个对象  
        UserInfo.UserMsg.Builder userInfo = UserInfo.UserMsg.newBuilder();  
        userInfo.setId(1);
        userInfo.setName("xuwujing");
        userInfo.setAge(18);
        UserInfo.UserMsg userMsg = userInfo.build();  
        // 将数据写到输出流 
        ByteArrayOutputStream output = new ByteArrayOutputStream();  
        userMsg.writeTo(output);  
        // 将数据序列化后发送 
        byte[] byteArray = output.toByteArray();  
        // 接收到流并读取
        ByteArrayInputStream input = new ByteArrayInputStream(byteArray);  
        // 反序列化  
        UserInfo.UserMsg userInfo2 = UserInfo.UserMsg.parseFrom(input);  
        System.out.println("id:" + userInfo2.getId());  
        System.out.println("name:" + userInfo2.getName());  
        System.out.println("age:" + userInfo2.getAge());

附註:這裡說明一點,因為protobuf是透過二進位傳輸,所以需要注意下對應的編碼。還有使用protobuf也需要注意一次傳輸的最大位元組長度。

輸出結果:

id:1
name:xuwujing
age:18

SpringBoot整合Netty

說明:如果想直接取得工程那麼可以直接跳到底部,透過鏈接下載工程代碼。

開發準備

環境需求
#JDK::1.8
Netty:: 4.0或以上(不含5)
Protobuf:3.0以上

如果對Netty不熟的話,可以看看我之前寫的一些文章。大神請無視~。 ~
位址:https://blog.csdn.net/column/details/17640.html

首先還是Maven的相關依賴:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <netty.version>4.1.22.Final</netty.version>
        <protobuf.version>3.5.1</protobuf.version>
        <springboot>1.5.9.RELEASE</springboot>
        <fastjson>1.2.41</fastjson>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>


    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>${springboot}</version>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>${springboot}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <version>${springboot}</version>
            <optional>true</optional>
        </dependency>

        
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>${netty.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>${protobuf.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson}</version>
        </dependency>

        
    <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency> 
</dependencies>

新增了對應的maven依賴之後,設定檔這塊暫時沒有什麼可以加的,因為暫時就一個監聽的埠而已。

程式碼編寫

程式碼模組主要分為服務端和客戶端。
主要實現的業務邏輯:
服務端啟動成功之後,客戶端也啟動成功,這時服務端會發送一條protobuf格式的訊息給客戶端,然後客戶端給予相應的應答。在客戶端與服務端連線成功之後,客戶端每個一段時間會發送心跳指令給服務端,告訴服務端該客戶端還存過中,如果客戶端沒有在指定的時間發送訊息,服務端會關閉與該客戶端的連線。當客戶端無法連接到服務端之後,會每隔一段時間去嘗試重連,只到重連成功!

服務端

首先是編寫服務端的啟動類,相應的註釋在程式碼中寫得很詳細了,這裡也不再多講了。不過要注意的是,在之前的我寫的Netty文章中,是透過main方法直接啟動服務端,因此是直接new一個物件的。而在和SpringBoot整合之後,我們需要將Netty交給springBoot去管理,所以這裡就用了相應的註解。
程式碼如下:

@Service("nettyServer")
public class NettyServer {
    private static final int port = 9876; // 设置服务端端口
    private static EventLoopGroup boss = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接
    private static EventLoopGroup work = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接
    private static ServerBootstrap b = new ServerBootstrap();
    
    @Autowired
    private NettyServerFilter nettyServerFilter;
    
    
    public void run() {
        try {
            b.group(boss, work);
            b.channel(NioServerSocketChannel.class);
            b.childHandler(nettyServerFilter); // 设置过滤器
            // 服务器绑定端口监听
            ChannelFuture f = b.bind(port).sync();
            System.out.println("服务端启动成功,端口是:" + port);
            // 监听服务器关闭监听
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 关闭EventLoopGroup,释放掉所有资源包括创建的线程
            work.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}

服務端主類別寫完畢之後,我們再來設定下對應的篩選條件。
這裡需要繼承Netty中ChannelInitializer類,然後重寫initChannel該方法,進行添加相應的設置,如心跳超時設置,傳輸協議設置,以及相應的業務實現類。
程式碼如下:

#
    @Component
     public class NettyServerFilter extends ChannelInitializer<SocketChannel> {
    
    @Autowired
    private NettyServerHandler nettyServerHandler;
    
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline ph = ch.pipeline();
      
         //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
         ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
         // 解码和编码,应和客户端一致
         //传输的协议 Protobuf
         ph.addLast(new ProtobufVarint32FrameDecoder());
         ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
         ph.addLast(new ProtobufVarint32LengthFieldPrepender());
         ph.addLast(new ProtobufEncoder());
         
         //业务逻辑实现类
         ph.addLast("nettyServerHandler", nettyServerHandler);
       }
     }

服务相关的设置的代码写完之后,我们再来编写主要的业务代码。
使用Netty编写业务层的代码,我们需要继承ChannelInboundHandlerAdapterSimpleChannelInboundHandler类,在这里顺便说下它们两的区别吧。
继承SimpleChannelInboundHandler类之后,会在接收到数据后会自动release掉数据占用的Bytebuffer资源。并且继承该类需要指定数据格式。
而继承ChannelInboundHandlerAdapter则不会自动释放,需要手动调用ReferenceCountUtil.release()等方法进行释放。继承该类不需要指定数据格式。
所以在这里,个人推荐服务端继承ChannelInboundHandlerAdapter,手动进行释放,防止数据未处理完就自动释放了。而且服务端可能有多个客户端进行连接,并且每一个客户端请求的数据格式都不一致,这时便可以进行相应的处理。
客户端根据情况可以继承SimpleChannelInboundHandler类。好处是直接指定好传输的数据格式,就不需要再进行格式的转换了。

代码如下:

@Service("nettyServerHandler")
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /** 空闲次数 */
    private int idle_count = 1;
    /** 发送次数 */
    private int count = 1;


    /**
     * 建立连接时,发送一条消息
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
        UserInfo.UserMsg userMsg = UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0)
                .build();
        ctx.writeAndFlush(userMsg);
        super.channelActive(ctx);
    }

    /**
     * 超时处理 如果5秒没有接受客户端的心跳,就触发; 如果超过两次,则直接关闭;
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
        if (obj instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) obj;
            if (IdleState.READER_IDLE.equals(event.state())) { // 如果读通道处于空闲状态,说明没有接收到心跳命令
                System.out.println("已经5秒没有接收到客户端的信息了");
                if (idle_count > 1) {
                    System.out.println("关闭这个不活跃的channel");
                    ctx.channel().close();
                }
                idle_count++;
            }
        } else {
            super.userEventTriggered(ctx, obj);
        }
    }

    /**
     * 业务逻辑处理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("第" + count + "次" + ",服务端接受的消息:" + msg);
        try {
            // 如果是protobuf类型的数据
          if (msg instanceof UserMsg) {
                UserInfo.UserMsg userState = (UserInfo.UserMsg) msg;
                if (userState.getState() == 1) {
                    System.out.println("客户端业务处理成功!");
                } else if(userState.getState() == 2){
                    System.out.println("接受到客户端发送的心跳!");
                }else{
                    System.out.println("未知命令!");
                }
            } else {
                System.out.println("未知数据!" + msg);
                return;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ReferenceCountUtil.release(msg);
        }
        count++;
    }

    /**
     * 异常处理
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

还有个服务端的启动类,之前是通过main方法直接启动, 不过这里改成了通过springBoot进行启动,差别不大。
代码如下:

@SpringBootApplication
public class NettyServerApp {

    public static void main(String[] args) {
        // 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件
        ApplicationContext context = SpringApplication.run(NettyServerApp.class, args);
        NettyServer nettyServer = context.getBean(NettyServer.class);
        nettyServer.run();
    }

}

到这里服务端相应的代码就编写完毕了。

客户端

客户端这边的代码和服务端的很多地方都类似,我就不再过多细说了,主要将一些不同的代码拿出来简单的讲述下。
首先是客户端的主类,基本和服务端的差不多,也就是多了监听的端口和一个监听器(用来监听是否和服务端断开连接,用于重连)。
主要实现的代码逻辑如下:

    public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
        ChannelFuture f = null;
        try {
            if (bootstrap != null) {
                bootstrap.group(eventLoopGroup);
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
                bootstrap.handler(nettyClientFilter);
                bootstrap.remoteAddress(host, port);
                f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
                    final EventLoop eventLoop = futureListener.channel().eventLoop();
                    if (!futureListener.isSuccess()) {
                        System.out.println("与服务端断开连接!在10s之后准备尝试重连!");
                        eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
                    }
                });
                if(initFalg){
                    System.out.println("Netty客户端启动成功!");
                    initFalg=false;
                }
                // 阻塞
                f.channel().closeFuture().sync();
            }
        } catch (Exception e) {
            System.out.println("客户端连接失败!"+e.getMessage());
        }
    }

注:监听器这块的实现用的是JDK1.8的写法。

客户端过滤其这块基本和服务端一直。不过需要注意的是,传输协议、编码和解码应该一致,还有心跳的读写时间应该小于服务端所设置的时间。
改动的代码如下:

    ChannelPipeline ph = ch.pipeline();
        /*
         * 解码和编码,应和服务端一致
         * */
        //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
        ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));

客户端的业务代码逻辑。
主要实现的几点逻辑是心跳按时发送以及解析服务发送的protobuf格式的数据。
这里比服务端多个个注解, 该注解Sharable主要是为了多个handler可以被多个channel安全地共享,也就是保证线程安全。
废话就不多说了,代码如下:

    @Service("nettyClientHandler")
    @ChannelHandler.Sharable
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    private NettyClient nettyClient;
    
    /** 循环次数 */
    private int fcount = 1;
    
    /**
     * 建立连接时
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("建立连接时:" + new Date());
        ctx.fireChannelActive();
    }

    /**
     * 关闭连接时
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("关闭连接时:" + new Date());
        final EventLoop eventLoop = ctx.channel().eventLoop();
        nettyClient.doConnect(new Bootstrap(), eventLoop);
        super.channelInactive(ctx);
    }

    /**
     * 心跳请求处理 每4秒发送一次心跳请求;
     * 
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
        System.out.println("循环请求的时间:" + new Date() + ",次数" + fcount);
        if (obj instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) obj;
            if (IdleState.WRITER_IDLE.equals(event.state())) { // 如果写通道处于空闲状态,就发送心跳命令
                UserMsg.Builder userState = UserMsg.newBuilder().setState(2);
                ctx.channel().writeAndFlush(userState);
                fcount++;
            }
        }
    }

    /**
     * 业务逻辑处理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 如果不是protobuf类型的数据
        if (!(msg instanceof UserMsg)) {
            System.out.println("未知数据!" + msg);
            return;
        }
        try {

            // 得到protobuf的数据
            UserInfo.UserMsg userMsg = (UserInfo.UserMsg) msg;
            // 进行相应的业务处理。。。
            // 这里就从简了,只是打印而已
            System.out.println(
                    "客户端接受到的用户信息。编号:" + userMsg.getId() + ",姓名:" + userMsg.getName() + ",年龄:" + userMsg.getAge());

            // 这里返回一个已经接受到数据的状态
            UserMsg.Builder userState = UserMsg.newBuilder().setState(1);
            ctx.writeAndFlush(userState);
            System.out.println("成功发送给服务端!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ReferenceCountUtil.release(msg);
        }
     }
    }

那么到这里客户端的代码也编写完毕了。

功能测试

首先启动服务端,然后再启动客户端。
我们来看看结果是否如上述所说。

服务端输出结果:

服务端启动成功,端口是:9876
连接的客户端地址:/127.0.0.1:53319
第1次,服务端接受的消息:state: 1

客户端业务处理成功!
第2次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第3次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第4次,服务端接受的消息:state: 2

接受到客户端发送的心跳!

客户端输入结果:

Netty客户端启动成功!
建立连接时:Mon Jul 16 23:31:58 CST 2018
客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
成功发送给服务端!
循环请求的时间:Mon Jul 16 23:32:02 CST 2018,次数1
循环请求的时间:Mon Jul 16 23:32:06 CST 2018,次数2
循环请求的时间:Mon Jul 16 23:32:10 CST 2018,次数3
循环请求的时间:Mon Jul 16 23:32:14 CST 2018,次数4

通过打印信息可以看出如上述所说。

接下来我们再来看看客户端是否能够实现重连。
先启动客户端,再启动服务端。

客户端输入结果:

Netty客户端启动成功!
与服务端断开连接!在10s之后准备尝试重连!
客户端连接失败!AbstractChannel$CloseFuture@1fbaa3ac(incomplete)
建立连接时:Mon Jul 16 23:41:33 CST 2018
客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
成功发送给服务端!
循环请求的时间:Mon Jul 16 23:41:38 CST 2018,次数1
循环请求的时间:Mon Jul 16 23:41:42 CST 2018,次数2
循环请求的时间:Mon Jul 16 23:41:46 CST 2018,次数3

服务端输出结果:

服务端启动成功,端口是:9876
连接的客户端地址:/127.0.0.1:53492
第1次,服务端接受的消息:state: 1

客户端业务处理成功!
第2次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第3次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第4次,服务端接受的消息:state: 2

结果也如上述所说!

关于SpringBoot整合Netty使用Protobuf进行数据传输到这里就结束了。
SpringBoot整合Netty使用Protobuf进行数据传输的项目工程地址:
https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf

对了,也有不使用springBoot整合的Netty项目工程地址:
https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf

相关推荐: 

SpringBoot动态管理定时任务的实现代码

使用Spring Cloud Netflix Zuul代理网关访问后台REST服务的实现(代码)

以上是SpringBoot整合Netty並使用Protobuf進行資料傳輸的實作過程的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn