ホームページ >Java >&#&チュートリアル >Nettyを統合し、データ送信にProtobufを使用するSpringBootの実装プロセス

Nettyを統合し、データ送信にProtobufを使用するSpringBootの実装プロセス

不言
不言オリジナル
2018-09-12 16:20:273363ブラウズ

この記事の内容は、SpringBoot を統合してデータ転送に Protobuf を使用するプロセスに関するもので、必要な方は参考にしていただければ幸いです。

まえがき

この記事では、SpringBoot による Netty の統合と、データ転送における Protobuf の使用について主に紹介します。 Protobuf の使い方を簡単に紹介します。Netty については以前の記事で簡単に紹介したので、ここでは詳しく説明しません。

Protobuf

はじめに

protocolbuffer (以下、PB) は、言語やプラットフォームに依存しない Google のデータ交換フォーマットです。 Google では、Java、C#、C++、go、Python などの複数の言語で実装を提供しています。各実装には、対応する言語のコンパイラとライブラリ ファイルが含まれています。バイナリ形式であるため、データ交換に XML を使用するよりもはるかに高速です。分散アプリケーション間のデータ通信や異種環境でのデータ交換に使用できます。効率性と互換性に優れたバイナリデータ伝送形式として、ネットワーク伝送、設定ファイル、データストレージなど幅広い分野で利用可能です。

公式アドレス: https://github.com/google/protobuf

使用方法

ここでの使用方法は、Java 関連の使用方法のみを紹介します。
まず、転送する必要があるファイルを定義する proto ファイルを作成する必要があります。
たとえば、主に番号、名前、年齢などのユーザー情報を定義する必要があります。
protobuf ファイルの形式は次のとおりです。
: ここでは proto3 が使用されています。関連するコメントはすでに書いているので、ここでは詳しく説明しません。注意すべき点は、proto ファイルと生成された Java ファイルの名前を同じにすることはできないということです。

syntax = "proto3";
// 生成的包名
option java_package="com.pancm.protobuf";
//生成的java名
option java_outer_classname = "UserInfo";

message UserMsg {  
      
     // ID  
     int32 id = 1;  
      
    // 姓名  
     string name = 2;  
      
    // 年龄  
      int32 age = 3;  
      
     // 状态 
     int32 state = 4;  
}

ファイルを作成した後、そのファイルと

protoc.exe (Java を生成するソフトウェア) を配置します。ファイル) 上の E ドライブ上の protobuf フォルダーの下で、ディレクトリの dos インターフェイスに と入力します。 protoc.exe --java_out=文件绝对路径名称例:

protoc.exe --java_out=E:\protobuf User.proto

入力後、Enterを押すと、生成されたJavaファイルが同階層のディレクトリに表示され、プロジェクト内のファイルで指定されたパスにファイルを配置します。

注: protobuf ファイル ソフトウェアとテスト protobuf ファイルもこのプロジェクトに統合しており、直接取得できます。

Java ファイルが生成されたら、その使用方法を見てみましょう。

ここにコードを直接貼り付け、コード内にコメントを書きます。これにより、理解しやすくなるはずです。 。 。

コード例:

     // 按照定义的数据结构,创建一个对象  
        UserInfo.UserMsg.Builder userInfo = UserInfo.UserMsg.newBuilder();  
        userInfo.setId(1);
        userInfo.setName("xuwujing");
        userInfo.setAge(18);
        UserInfo.UserMsg userMsg = userInfo.build();  
        // 将数据写到输出流 
        ByteArrayOutputStream output = new ByteArrayOutputStream();  
        userMsg.writeTo(output);  
        // 将数据序列化后发送 
        byte[] byteArray = output.toByteArray();  
        // 接收到流并读取
        ByteArrayInputStream input = new ByteArrayInputStream(byteArray);  
        // 反序列化  
        UserInfo.UserMsg userInfo2 = UserInfo.UserMsg.parseFrom(input);  
        System.out.println("id:" + userInfo2.getId());  
        System.out.println("name:" + userInfo2.getName());  
        System.out.println("age:" + userInfo2.getAge());

注:

protobufはバイナリで送信されるため、対応するエンコーディングに注意する必要があるため、ここで注意してください。 protobuf を使用する場合は、送信の最大バイト長にも注意する必要があります。

出力結果:

id:1
name:xuwujing
age:18

SpringBoot は Netty を統合します

注: プロジェクトを直接取得したい場合は、一番下に直接ジャンプし、リンクからプロジェクト コードをダウンロードできます。

開発準備

環境要件
JDK:: 1.8
Netty:: 4.0以上(5を除く)
Protobuf: 3.0以上

Nettyに慣れていない方、あなた私の以前の記事をいくつかチェックしてください。神様、無視してください~。 ~

アドレス: https://blog.csdn.net/column/details/17640.html

まず、Maven 関連の依存関係:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <netty.version>4.1.22.Final</netty.version>
        <protobuf.version>3.5.1</protobuf.version>
        <springboot>1.5.9.RELEASE</springboot>
        <fastjson>1.2.41</fastjson>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>


    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>${springboot}</version>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>${springboot}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <version>${springboot}</version>
            <optional>true</optional>
        </dependency>

        
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>${netty.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>${protobuf.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson}</version>
        </dependency>

        
    <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency> 
</dependencies>

対応する Maven の依存関係を追加した後は、設定に追加するものは何もありませんはい、当面は単なるリスニングポートであるためです。

コード作成

コードモジュールは主にサーバーとクライアントに分かれています。

主に実装されたビジネスロジック:
サーバーが正常に起動すると、クライアントも正常に起動します。このとき、サーバーは
protobuf形式のメッセージをクライアントに送信し、クライアントは対応する応答を返します。クライアントとサーバーが正常に接続されると、クライアントは一定期間ごとにハートビート コマンドをサーバーに送信し、クライアントが指定された時間に情報を送信しない場合、サーバーは閉じられます。サーバーとの接続。クライアントがサーバーに接続できない場合、再接続が成功するまで時々再接続を試みます!

サーバー
最初のステップはサーバーの起動クラスを作成することであり、対応するコメントはコードの詳細 さて、ここではあまり詳しく説明しません。ただし、私が以前に書いた Netty の記事では、サーバーは main メソッドを通じて直接起動されていたため、オブジェクトは直接新しいものであったことに注意してください。 SpringBoot と統合した後、管理のために Netty を springBoot に引き渡す必要があるため、ここでは対応するアノテーションが使用されます。


コードは次のとおりです:

@Service("nettyServer")
public class NettyServer {
    private static final int port = 9876; // 设置服务端端口
    private static EventLoopGroup boss = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接
    private static EventLoopGroup work = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接
    private static ServerBootstrap b = new ServerBootstrap();
    
    @Autowired
    private NettyServerFilter nettyServerFilter;
    
    
    public void run() {
        try {
            b.group(boss, work);
            b.channel(NioServerSocketChannel.class);
            b.childHandler(nettyServerFilter); // 设置过滤器
            // 服务器绑定端口监听
            ChannelFuture f = b.bind(port).sync();
            System.out.println("服务端启动成功,端口是:" + port);
            // 监听服务器关闭监听
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 关闭EventLoopGroup,释放掉所有资源包括创建的线程
            work.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}

サーバーのメインクラスを記述したら、対応するフィルター条件を設定しましょう。

ここでは、Netty の
ChannelInitializer クラスを継承し、initChannel メソッドを書き換えて、ハートビート タイムアウト設定、送信プロトコル設定、対応するビジネス実装クラスなどの対応する設定を追加する必要があります。
コードは次のとおりです:

    @Component
     public class NettyServerFilter extends ChannelInitializer<SocketChannel> {
    
    @Autowired
    private NettyServerHandler nettyServerHandler;
    
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline ph = ch.pipeline();
      
         //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
         ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
         // 解码和编码,应和客户端一致
         //传输的协议 Protobuf
         ph.addLast(new ProtobufVarint32FrameDecoder());
         ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
         ph.addLast(new ProtobufVarint32LengthFieldPrepender());
         ph.addLast(new ProtobufEncoder());
         
         //业务逻辑实现类
         ph.addLast("nettyServerHandler", nettyServerHandler);
       }
     }

服务相关的设置的代码写完之后,我们再来编写主要的业务代码。
使用Netty编写业务层的代码,我们需要继承ChannelInboundHandlerAdapterSimpleChannelInboundHandler类,在这里顺便说下它们两的区别吧。
继承SimpleChannelInboundHandler类之后,会在接收到数据后会自动release掉数据占用的Bytebuffer资源。并且继承该类需要指定数据格式。
而继承ChannelInboundHandlerAdapter则不会自动释放,需要手动调用ReferenceCountUtil.release()等方法进行释放。继承该类不需要指定数据格式。
所以在这里,个人推荐服务端继承ChannelInboundHandlerAdapter,手动进行释放,防止数据未处理完就自动释放了。而且服务端可能有多个客户端进行连接,并且每一个客户端请求的数据格式都不一致,这时便可以进行相应的处理。
客户端根据情况可以继承SimpleChannelInboundHandler类。好处是直接指定好传输的数据格式,就不需要再进行格式的转换了。

代码如下:

@Service("nettyServerHandler")
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /** 空闲次数 */
    private int idle_count = 1;
    /** 发送次数 */
    private int count = 1;


    /**
     * 建立连接时,发送一条消息
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
        UserInfo.UserMsg userMsg = UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0)
                .build();
        ctx.writeAndFlush(userMsg);
        super.channelActive(ctx);
    }

    /**
     * 超时处理 如果5秒没有接受客户端的心跳,就触发; 如果超过两次,则直接关闭;
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
        if (obj instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) obj;
            if (IdleState.READER_IDLE.equals(event.state())) { // 如果读通道处于空闲状态,说明没有接收到心跳命令
                System.out.println("已经5秒没有接收到客户端的信息了");
                if (idle_count > 1) {
                    System.out.println("关闭这个不活跃的channel");
                    ctx.channel().close();
                }
                idle_count++;
            }
        } else {
            super.userEventTriggered(ctx, obj);
        }
    }

    /**
     * 业务逻辑处理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("第" + count + "次" + ",服务端接受的消息:" + msg);
        try {
            // 如果是protobuf类型的数据
          if (msg instanceof UserMsg) {
                UserInfo.UserMsg userState = (UserInfo.UserMsg) msg;
                if (userState.getState() == 1) {
                    System.out.println("客户端业务处理成功!");
                } else if(userState.getState() == 2){
                    System.out.println("接受到客户端发送的心跳!");
                }else{
                    System.out.println("未知命令!");
                }
            } else {
                System.out.println("未知数据!" + msg);
                return;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ReferenceCountUtil.release(msg);
        }
        count++;
    }

    /**
     * 异常处理
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

还有个服务端的启动类,之前是通过main方法直接启动, 不过这里改成了通过springBoot进行启动,差别不大。
代码如下:

@SpringBootApplication
public class NettyServerApp {

    public static void main(String[] args) {
        // 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件
        ApplicationContext context = SpringApplication.run(NettyServerApp.class, args);
        NettyServer nettyServer = context.getBean(NettyServer.class);
        nettyServer.run();
    }

}

到这里服务端相应的代码就编写完毕了。

客户端

客户端这边的代码和服务端的很多地方都类似,我就不再过多细说了,主要将一些不同的代码拿出来简单的讲述下。
首先是客户端的主类,基本和服务端的差不多,也就是多了监听的端口和一个监听器(用来监听是否和服务端断开连接,用于重连)。
主要实现的代码逻辑如下:

    public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
        ChannelFuture f = null;
        try {
            if (bootstrap != null) {
                bootstrap.group(eventLoopGroup);
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
                bootstrap.handler(nettyClientFilter);
                bootstrap.remoteAddress(host, port);
                f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
                    final EventLoop eventLoop = futureListener.channel().eventLoop();
                    if (!futureListener.isSuccess()) {
                        System.out.println("与服务端断开连接!在10s之后准备尝试重连!");
                        eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
                    }
                });
                if(initFalg){
                    System.out.println("Netty客户端启动成功!");
                    initFalg=false;
                }
                // 阻塞
                f.channel().closeFuture().sync();
            }
        } catch (Exception e) {
            System.out.println("客户端连接失败!"+e.getMessage());
        }
    }

注:监听器这块的实现用的是JDK1.8的写法。

客户端过滤其这块基本和服务端一直。不过需要注意的是,传输协议、编码和解码应该一致,还有心跳的读写时间应该小于服务端所设置的时间。
改动的代码如下:

    ChannelPipeline ph = ch.pipeline();
        /*
         * 解码和编码,应和服务端一致
         * */
        //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
        ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));

客户端的业务代码逻辑。
主要实现的几点逻辑是心跳按时发送以及解析服务发送的protobuf格式的数据。
这里比服务端多个个注解, 该注解Sharable主要是为了多个handler可以被多个channel安全地共享,也就是保证线程安全。
废话就不多说了,代码如下:

    @Service("nettyClientHandler")
    @ChannelHandler.Sharable
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    private NettyClient nettyClient;
    
    /** 循环次数 */
    private int fcount = 1;
    
    /**
     * 建立连接时
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("建立连接时:" + new Date());
        ctx.fireChannelActive();
    }

    /**
     * 关闭连接时
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("关闭连接时:" + new Date());
        final EventLoop eventLoop = ctx.channel().eventLoop();
        nettyClient.doConnect(new Bootstrap(), eventLoop);
        super.channelInactive(ctx);
    }

    /**
     * 心跳请求处理 每4秒发送一次心跳请求;
     * 
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
        System.out.println("循环请求的时间:" + new Date() + ",次数" + fcount);
        if (obj instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) obj;
            if (IdleState.WRITER_IDLE.equals(event.state())) { // 如果写通道处于空闲状态,就发送心跳命令
                UserMsg.Builder userState = UserMsg.newBuilder().setState(2);
                ctx.channel().writeAndFlush(userState);
                fcount++;
            }
        }
    }

    /**
     * 业务逻辑处理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 如果不是protobuf类型的数据
        if (!(msg instanceof UserMsg)) {
            System.out.println("未知数据!" + msg);
            return;
        }
        try {

            // 得到protobuf的数据
            UserInfo.UserMsg userMsg = (UserInfo.UserMsg) msg;
            // 进行相应的业务处理。。。
            // 这里就从简了,只是打印而已
            System.out.println(
                    "客户端接受到的用户信息。编号:" + userMsg.getId() + ",姓名:" + userMsg.getName() + ",年龄:" + userMsg.getAge());

            // 这里返回一个已经接受到数据的状态
            UserMsg.Builder userState = UserMsg.newBuilder().setState(1);
            ctx.writeAndFlush(userState);
            System.out.println("成功发送给服务端!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ReferenceCountUtil.release(msg);
        }
     }
    }

那么到这里客户端的代码也编写完毕了。

功能测试

首先启动服务端,然后再启动客户端。
我们来看看结果是否如上述所说。

服务端输出结果:

服务端启动成功,端口是:9876
连接的客户端地址:/127.0.0.1:53319
第1次,服务端接受的消息:state: 1

客户端业务处理成功!
第2次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第3次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第4次,服务端接受的消息:state: 2

接受到客户端发送的心跳!

客户端输入结果:

Netty客户端启动成功!
建立连接时:Mon Jul 16 23:31:58 CST 2018
客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
成功发送给服务端!
循环请求的时间:Mon Jul 16 23:32:02 CST 2018,次数1
循环请求的时间:Mon Jul 16 23:32:06 CST 2018,次数2
循环请求的时间:Mon Jul 16 23:32:10 CST 2018,次数3
循环请求的时间:Mon Jul 16 23:32:14 CST 2018,次数4

通过打印信息可以看出如上述所说。

接下来我们再来看看客户端是否能够实现重连。
先启动客户端,再启动服务端。

客户端输入结果:

Netty客户端启动成功!
与服务端断开连接!在10s之后准备尝试重连!
客户端连接失败!AbstractChannel$CloseFuture@1fbaa3ac(incomplete)
建立连接时:Mon Jul 16 23:41:33 CST 2018
客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
成功发送给服务端!
循环请求的时间:Mon Jul 16 23:41:38 CST 2018,次数1
循环请求的时间:Mon Jul 16 23:41:42 CST 2018,次数2
循环请求的时间:Mon Jul 16 23:41:46 CST 2018,次数3

服务端输出结果:

服务端启动成功,端口是:9876
连接的客户端地址:/127.0.0.1:53492
第1次,服务端接受的消息:state: 1

客户端业务处理成功!
第2次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第3次,服务端接受的消息:state: 2

接受到客户端发送的心跳!
第4次,服务端接受的消息:state: 2

结果也如上述所说!

关于SpringBoot整合Netty使用Protobuf进行数据传输到这里就结束了。
SpringBoot整合Netty使用Protobuf进行数据传输的项目工程地址:
https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf

对了,也有不使用springBoot整合的Netty项目工程地址:
https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf

相关推荐: 

SpringBoot动态管理定时任务的实现代码

使用Spring Cloud Netflix Zuul代理网关访问后台REST服务的实现(代码)

以上がNettyを統合し、データ送信にProtobufを使用するSpringBootの実装プロセスの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。