>Java >java지도 시간 >Java 필기 RPC 프레임워크

Java 필기 RPC 프레임워크

Guanhui
Guanhui앞으로
2020-06-17 17:30:413394검색

Java 필기 RPC 프레임워크

RPC 프레임워크를 원격 호출 프레임워크라고 합니다. 구현의 핵심 원칙은 소비자 측에서 인터페이스 메소드를 프록시하기 위해 동적 프록시를 사용한다는 것입니다(JDK 기반의 동적 프록시는 물론 CGLib를 사용하는 경우 인터페이스 클래스 없이 메소드를 직접 사용할 수 있습니다. ) 네트워크 전송 프로그래밍을 추가하여 공급자가 전송 호출 인터페이스 메소드 이름과 메소드 매개변수를 얻은 다음 리플렉션을 통해 인터페이스의 메소드를 실행하고 결과는 다음과 같습니다. 반사 실행은 네트워크 프로그래밍을 통해 소비자에게 다시 전송됩니다.

이제 이러한 개념을 하나씩 구현해 보겠습니다. 여기서는 가장 간단한 구현을 수행합니다. 네트워크 프로그래밍은 BIO를 사용하여 Reactor 모드에서 Netty를 사용하여 더 나은 성능으로 다시 작성할 수 있습니다. 네트워크 전송에 사용되는 직렬화 및 역직렬화도 Java에 기본적으로 적용됩니다. 물론 이러한 전송 바이트는 상대적으로 크며 Google의 protoBuffer 또는 kryo를 사용하여 처리할 수 있습니다. 이것은 단지 원리를 설명하기 위한 편의를 위한 것입니다.

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.guanjian</groupId>
    <artifactId>rpc-framework</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

우선 당연히 원격으로 호출하려는 인터페이스와 인터페이스 메소드입니다.

public interface HelloService {
    String sayHello(String content);}

인터페이스 구현 클래스

public class HelloServiceImpl implements HelloService {    public String sayHello(String content) {        return "hello," + content;    }
}

Consumer 측의 동적 프록시,두 프로젝트에 Provider와 Consumer를 작성한다면 Provider 측에는 위의 인터페이스와 구현 클래스가 필요하고 Consumer 측에는 위의 인터페이스만 필요합니다.

public class ConsumerProxy {
    /**
     * 消费者端的动态代理
     * @param interfaceClass 代理的接口类
     * @param host 远程主机IP
     * @param port 远程主机端口
     * @param <T>
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> T consume(final Class<T> interfaceClass,final String host,final int port) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class[]{interfaceClass}, (proxy,method,args) -> {
                    //创建一个客户端套接字
                    Socket socket = new Socket(host, port);
                    try {
                        //创建一个对外传输的对象流,绑定套接字
                        ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                        try {
                            //将动态代理的方法名写入对外传输的对象流中
                            output.writeUTF(method.getName());
                            //将动态代理的方法的参数写入对外传输的对象流中
                            output.writeObject(args);
                            //创建一个对内传输的对象流,绑定套接字
                            //这里是为了获取提供者端传回的结果
                            ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                            try {
                                //从对内传输的对象流中获取结果
                                Object result = input.readObject();
                                if (result instanceof Throwable) {
                                    throw (Throwable) result;
                                }
                                return result;
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
        );
    }
}

JDK 동적 프록시에 대한 자세한 내용은 AOP 원칙 및 자체 구현을 참조하세요. BIO의 경우 기존 IO와 NIO의 비교를 참조하세요.

공급자 측 네트워크 전송 및 원격 호출 서비스

public class ProviderReflect {
    private static final ExecutorService executorService = Executors.newCachedThreadPool();

    /**
     * RPC监听和远程方法调用
     * @param service RPC远程方法调用的接口实例
     * @param port 监听的端口
     * @throws Exception
     */
    public static void provider(final Object service,int port) throws Exception {
        //创建服务端的套接字,绑定端口port
        ServerSocket serverSocket = new ServerSocket(port);
        while (true) {
            //开始接收客户端的消息,并以此创建套接字
            final Socket socket = serverSocket.accept();
            //多线程执行,这里的问题是连接数过大,线程池的线程数会耗尽
            executorService.execute(() -> {
                try {
                    //创建呢一个对内传输的对象流,并绑定套接字
                    ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                    try {
                        try {
                            //从对象流中读取接口方法的方法名
                            String methodName = input.readUTF();
                            //从对象流中读取接口方法的所有参数
                            Object[] args = (Object[]) input.readObject();
                            Class[] argsTypes = new Class[args.length];
                            for (int i = 0;i < args.length;i++) {
                                argsTypes[i] = args[i].getClass();

                            }
                            //创建一个对外传输的对象流,并绑定套接字
                            //这里是为了将反射执行结果传递回消费者端
                            ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                            try {
                                Class<?>[] interfaces = service.getClass().getInterfaces();
                                Method method = null;
                                for (int i = 0;i < interfaces.length;i++) {
                                    method = interfaces[i].getDeclaredMethod(methodName,argsTypes);
                                    if (method != null) {
                                        break;
                                    }
                                }
                                Object result = method.invoke(service, args);
                                //将反射执行结果写入对外传输的对象流中
                                output.writeObject(result);
                            } catch (Throwable t) {
                                output.writeObject(t);
                            } finally {
                                output.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            input.close();
                        }
                    } finally {
                        socket.close();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

공급자 측에서 네트워크 청취 시작 및 원격 호출

public class RPCProviderMain {
    public static void main(String[] args) throws Exception {
        HelloService service = new HelloServiceImpl();
        ProviderReflect.provider(service,8083);
    }
}

소비자의 동적 프록시 호출 시작

public class RPCConsumerMain {
    public static void main(String[] args) throws InterruptedException {
        HelloService service = ConsumerProxy.consume(HelloService.class,"127.0.0.1",8083);
        for (int i = 0;i < 1000;i++) {
            String hello = service.sayHello("你好_" + i);
            System.out.println(hello);
            Thread.sleep(1000);
        }
    }
}

실행 결과

hello, hello_0
hello, hello_1
hello, hello_2
hello, hello_3
hello, Hello_4
hello, hello_5

....

Netty+ProtoBuffer를 사용하여 고성능 RPC 프레임워크로 확장하려는 경우 Protobuffer를 통합한 Netty의 관련 작성 방법을 참조할 수 있습니다.

추천 튜토리얼: "PHP"

위 내용은 Java 필기 RPC 프레임워크의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 oschina.net에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제