ホームページ >Java >&#&チュートリアル >Java による RPC フレームワークの手書き
RPC フレームワークはリモート呼び出しフレームワークと呼ばれ、その実装の中心原則は、コンシューマが動的プロキシを使用してインターフェイスをプロキシすることです (JDK に基づく動的プロキシ、もちろん CGLib を使えばインターフェースクラスを使わずにメソッドを直接使うことも可能です) ネットワーク送信プログラミングを追加することで、送信呼び出しインターフェースのメソッド名とメソッドパラメータがプロバイダーによって取得され、リフレクションを通じてインターフェースのメソッドが取得されます。が実行され、その後リフレクションが実行され、その結果がネットワーク プログラミングを通じて消費者に返されます。
次に、これらの概念を順番に実装してみましょう。ここでは最も単純な実装を行います。ネットワーク プログラミングでは BIO が使用されます。Reactor モードで Netty を使用すると、パフォーマンスが向上する方法で書き直すことができます。ネットワーク送信で使用されるシリアル化とデシリアライゼーションも Java ネイティブであり、もちろん、このような送信バイトは比較的大きく、Google の protoBuffer または kryo を使用して処理できます。 これは原理を説明するための便宜上のものです。
pom
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.guanjian</groupId> <artifactId>rpc-framework</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
まず、もちろん、リモートで呼び出すインターフェイスとインターフェイス メソッドです。
public interface HelloService { String sayHello(String content);}
インターフェイス実装クラス
public class HelloServiceImpl implements HelloService { public String sayHello(String content) { return "hello," + content; } }
コンシューマ側の動的プロキシ、プロバイダとコンシューマを2つのプロジェクトに記述する場合、プロバイダ側には上記のインターフェイスと実装クラスが必要ですが、コンシューマ側には上記のインターフェースのみが必要です。
public class ConsumerProxy { /** * 消费者端的动态代理 * @param interfaceClass 代理的接口类 * @param host 远程主机IP * @param port 远程主机端口 * @param <T> * @return */ @SuppressWarnings("unchecked") public static <T> T consume(final Class<T> interfaceClass,final String host,final int port) { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, (proxy,method,args) -> { //创建一个客户端套接字 Socket socket = new Socket(host, port); try { //创建一个对外传输的对象流,绑定套接字 ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { //将动态代理的方法名写入对外传输的对象流中 output.writeUTF(method.getName()); //将动态代理的方法的参数写入对外传输的对象流中 output.writeObject(args); //创建一个对内传输的对象流,绑定套接字 //这里是为了获取提供者端传回的结果 ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { //从对内传输的对象流中获取结果 Object result = input.readObject(); if (result instanceof Throwable) { throw (Throwable) result; } return result; } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } ); } }
JDK 動的プロキシについては、AOP の原則と自己実装を参照してください。BIO については、従来の IO と NIO の比較を参照してください。
プロバイダー側ネットワーク伝送およびリモート呼び出しサービス
public class ProviderReflect { private static final ExecutorService executorService = Executors.newCachedThreadPool(); /** * RPC监听和远程方法调用 * @param service RPC远程方法调用的接口实例 * @param port 监听的端口 * @throws Exception */ public static void provider(final Object service,int port) throws Exception { //创建服务端的套接字,绑定端口port ServerSocket serverSocket = new ServerSocket(port); while (true) { //开始接收客户端的消息,并以此创建套接字 final Socket socket = serverSocket.accept(); //多线程执行,这里的问题是连接数过大,线程池的线程数会耗尽 executorService.execute(() -> { try { //创建呢一个对内传输的对象流,并绑定套接字 ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { try { //从对象流中读取接口方法的方法名 String methodName = input.readUTF(); //从对象流中读取接口方法的所有参数 Object[] args = (Object[]) input.readObject(); Class[] argsTypes = new Class[args.length]; for (int i = 0;i < args.length;i++) { argsTypes[i] = args[i].getClass(); } //创建一个对外传输的对象流,并绑定套接字 //这里是为了将反射执行结果传递回消费者端 ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { Class<?>[] interfaces = service.getClass().getInterfaces(); Method method = null; for (int i = 0;i < interfaces.length;i++) { method = interfaces[i].getDeclaredMethod(methodName,argsTypes); if (method != null) { break; } } Object result = method.invoke(service, args); //将反射执行结果写入对外传输的对象流中 output.writeObject(result); } catch (Throwable t) { output.writeObject(t); } finally { output.close(); } } catch (Exception e) { e.printStackTrace(); } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { e.printStackTrace(); } }); } } }
プロバイダー側でネットワーク リスニングとリモート呼び出しを開始
public class RPCProviderMain { public static void main(String[] args) throws Exception { HelloService service = new HelloServiceImpl(); ProviderReflect.provider(service,8083); } }
コンシューマの動的プロキシ呼び出しを開始
public class RPCConsumerMain { public static void main(String[] args) throws InterruptedException { HelloService service = ConsumerProxy.consume(HelloService.class,"127.0.0.1",8083); for (int i = 0;i < 1000;i++) { String hello = service.sayHello("你好_" + i); System.out.println(hello); Thread.sleep(1000); } } }
実行結果
こんにちは、こんにちは _0
こんにちは、こんにちは_1
こんにちは、こんにちは_2
こんにちは、こんにちは_3
こんにちは、こんにちは_4
こんにちは、こんにちは_5
....
Netty ProtoBuffer を使用して高性能 RPC フレームワークに拡張したい場合は、Protobuffer を統合した Netty の関連記述メソッドを参照してください。
推奨チュートリアル:「PHP」
以上がJava による RPC フレームワークの手書きの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。