ホームページ >Java >&#&チュートリアル >Java による RPC フレームワークの手書き

Java による RPC フレームワークの手書き

Guanhui
Guanhui転載
2020-06-17 17:30:413437ブラウズ

Java による RPC フレームワークの手書き

RPC フレームワークはリモート呼び出しフレームワークと呼ばれ、その実装の中心原則は、コンシューマが動的プロキシを使用してインターフェイスをプロキシすることです (JDK に基づく動的プロキシ、もちろん CGLib を使えばインターフェースクラスを使わずにメソッドを直接使うことも可能です) ネットワーク送信プログラミングを追加することで、送信呼び出しインターフェースのメソッド名とメソッドパラメータがプロバイダーによって取得され、リフレクションを通じてインターフェースのメソッドが取得されます。が実行され、その後リフレクションが実行され、その結果がネットワーク プログラミングを通じて消費者に返されます。

次に、これらの概念を順番に実装してみましょう。ここでは最も単純な実装を行います。ネットワーク プログラミングでは BIO が使用されます。Reactor モードで Netty を使用すると、パフォーマンスが向上する方法で書き直すことができます。ネットワーク送信で使用されるシリアル化とデシリアライゼーションも Java ネイティブであり、もちろん、このような送信バイトは比較的大きく、Google の protoBuffer または kryo を使用して処理できます。 これは原理を説明するための便宜上のものです。

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.guanjian</groupId>
    <artifactId>rpc-framework</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

まず、もちろん、リモートで呼び出すインターフェイスとインターフェイス メソッドです。

public interface HelloService {
    String sayHello(String content);}

インターフェイス実装クラス

public class HelloServiceImpl implements HelloService {    public String sayHello(String content) {        return "hello," + content;    }
}

コンシューマ側の動的プロキシ、プロバイダとコンシューマを2つのプロジェクトに記述する場合、プロバイダ側には上記のインターフェイスと実装クラスが必要ですが、コンシューマ側には上記のインターフェースのみが必要です。

public class ConsumerProxy {
    /**
     * 消费者端的动态代理
     * @param interfaceClass 代理的接口类
     * @param host 远程主机IP
     * @param port 远程主机端口
     * @param <T>
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> T consume(final Class<T> interfaceClass,final String host,final int port) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class[]{interfaceClass}, (proxy,method,args) -> {
                    //创建一个客户端套接字
                    Socket socket = new Socket(host, port);
                    try {
                        //创建一个对外传输的对象流,绑定套接字
                        ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                        try {
                            //将动态代理的方法名写入对外传输的对象流中
                            output.writeUTF(method.getName());
                            //将动态代理的方法的参数写入对外传输的对象流中
                            output.writeObject(args);
                            //创建一个对内传输的对象流,绑定套接字
                            //这里是为了获取提供者端传回的结果
                            ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                            try {
                                //从对内传输的对象流中获取结果
                                Object result = input.readObject();
                                if (result instanceof Throwable) {
                                    throw (Throwable) result;
                                }
                                return result;
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
        );
    }
}

JDK 動的プロキシについては、AOP の原則と自己実装を参照してください。BIO については、従来の IO と NIO の比較を参照してください。

プロバイダー側ネットワーク伝送およびリモート呼び出しサービス

public class ProviderReflect {
    private static final ExecutorService executorService = Executors.newCachedThreadPool();

    /**
     * RPC监听和远程方法调用
     * @param service RPC远程方法调用的接口实例
     * @param port 监听的端口
     * @throws Exception
     */
    public static void provider(final Object service,int port) throws Exception {
        //创建服务端的套接字,绑定端口port
        ServerSocket serverSocket = new ServerSocket(port);
        while (true) {
            //开始接收客户端的消息,并以此创建套接字
            final Socket socket = serverSocket.accept();
            //多线程执行,这里的问题是连接数过大,线程池的线程数会耗尽
            executorService.execute(() -> {
                try {
                    //创建呢一个对内传输的对象流,并绑定套接字
                    ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                    try {
                        try {
                            //从对象流中读取接口方法的方法名
                            String methodName = input.readUTF();
                            //从对象流中读取接口方法的所有参数
                            Object[] args = (Object[]) input.readObject();
                            Class[] argsTypes = new Class[args.length];
                            for (int i = 0;i < args.length;i++) {
                                argsTypes[i] = args[i].getClass();

                            }
                            //创建一个对外传输的对象流,并绑定套接字
                            //这里是为了将反射执行结果传递回消费者端
                            ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                            try {
                                Class<?>[] interfaces = service.getClass().getInterfaces();
                                Method method = null;
                                for (int i = 0;i < interfaces.length;i++) {
                                    method = interfaces[i].getDeclaredMethod(methodName,argsTypes);
                                    if (method != null) {
                                        break;
                                    }
                                }
                                Object result = method.invoke(service, args);
                                //将反射执行结果写入对外传输的对象流中
                                output.writeObject(result);
                            } catch (Throwable t) {
                                output.writeObject(t);
                            } finally {
                                output.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            input.close();
                        }
                    } finally {
                        socket.close();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

プロバイダー側​​でネットワーク リスニングとリモート呼び出しを開始

public class RPCProviderMain {
    public static void main(String[] args) throws Exception {
        HelloService service = new HelloServiceImpl();
        ProviderReflect.provider(service,8083);
    }
}

コンシューマの動的プロキシ呼び出しを開始

public class RPCConsumerMain {
    public static void main(String[] args) throws InterruptedException {
        HelloService service = ConsumerProxy.consume(HelloService.class,"127.0.0.1",8083);
        for (int i = 0;i < 1000;i++) {
            String hello = service.sayHello("你好_" + i);
            System.out.println(hello);
            Thread.sleep(1000);
        }
    }
}

実行結果

こんにちは、こんにちは _0
こんにちは、こんにちは_1
こんにちは、こんにちは_2
こんにちは、こんにちは_3
こんにちは、こんにちは_4
こんにちは、こんにちは_5

....

Netty ProtoBuffer を使用して高性能 RPC フレームワークに拡張したい場合は、Protobuffer を統合した Netty の関連記述メソッドを参照してください。

推奨チュートリアル:「PHP

以上がJava による RPC フレームワークの手書きの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はoschina.netで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。