Interviewers like to ask:

  • How to design a registration center?
  • How to design a message queue?
  • How to design a persistence framework?
  • How to design an RPC framework?
  • ......

Today, let’s talk about “RPC implementation principle” (other related topics: how to design an XX series, It has been published in Knowledge Planet)So first let’s clarify a question: What is RPC?

RPC is the abbreviation of Remote Procedure Call, that is, remote procedure call.

RPC is a computer communication protocol. This protocol allows a program running on one computer to call a subroutine on another computer without the developer having to additionally program this interaction.

It is worth noting that two or more applications are distributed on different servers, and calls between them are like local method calls. Next, let's analyze what happens in an RPC call?

Basic process of RPC calls

Some of the more popular RPC frameworks in the industry, such as those provided by Dubbo Interface-based remote method invocation means that the client only needs to know the definition of the interface to call remote services. In Java, interfaces cannot directly call instance methods. This operation must be done through their implementation class objects, which means that the client must generate proxy objects for these interfaces. For this purpose, Java provides support for Proxy and InvocationHandler to generate dynamic proxies; generate If you have a proxy object, how is each specific method called? When the proxy object generated by the JDK dynamic proxy calls the specified method, it will actually execute the #invoke method defined in the InvocationHandler, in which the remote method call is completed and the result is obtained.

Putting aside the client, looking back, RPC is a call between two computers. It is essentially a network communication between two hosts. When it comes to network communication, there must be serialization, deserialization, encoding and decoding, etc. Some issues that must be considered; at the same time, in fact, most systems are deployed in clusters. Multiple hosts/containers provide the same services to the outside world. If the number of nodes in the cluster is large, managing service addresses will also be very cumbersome. A common practice is that each service node registers its address and provided service list to a registration center, and the registration center manages the service list in a unified manner; this approach solves some problems and adds a new feature to the client. The job - that is, service discovery, generally speaking, is to find the service list corresponding to the remote method from the registration center and select a service address from it through a certain strategy to complete network communication.

After talking about the client and the registration center, another important role is naturally the server. The most important task of the server is to provide the real implementation of the service interface and monitor network requests on a certain port. After the request, the corresponding parameters (such as service interface, method, request parameters, etc.) are obtained from the network request, and then based on these parameters, the real implementation of the interface is called through reflection to obtain the result and written into the corresponding response stream.

To sum up, a basic RPC call process is roughly as follows:

Alibaba interviewer: Please handwrite an RPC framework for me.

Basic implementation

Server (producer)

Service interface:

In RPC, production Authors and consumers have a common service interface API. As follows, define a HelloService interface.

 * @Descrption  服务接口
public interface HelloService {
    String sayHello(String somebody);

Service implementation:

The producer needs to provide the implementation of the service interface and create the HelloServiceImpl implementation class.

 * @Descrption 服务实现
public class HelloServiceImpl implements HelloService {
    public String sayHello(String somebody) {
        return "hello " + somebody + "!";

Service registration:

This example uses Spring to manage beans, and uses custom XML and parsers to load the service implementation class into the container (of course, custom annotations can also be used method (not discussed here) and register the service interface information to the registration center.

First customize the XSD:

<xsd:element name="service">
            <xsd:extension base="beans:identifiedType">
                <xsd:attribute name="interface" type="xsd:string" use="required"/>
                <xsd:attribute name="timeout" type="xsd:int" use="required"/>
                <xsd:attribute name="serverPort" type="xsd:int" use="required"/>
                <xsd:attribute name="ref" type="xsd:string" use="required"/>
                <xsd:attribute name="weight" type="xsd:int" use="optional"/>
                <xsd:attribute name="workerThreads" type="xsd:int" use="optional"/>
                <xsd:attribute name="appKey" type="xsd:string" use="required"/>
                <xsd:attribute name="groupName" type="xsd:string" use="optional"/>

Specify Schema and XSD respectively, Schema and the mapping of the corresponding Handler.





Put the written file into the META-INF directory under Classpath:

Alibaba interviewer: Please handwrite an RPC framework for me.

在 Spring 配置文件中配置服务类:

<!-- 发布远程服务 -->
 <bean id="helloService" class="com.hsunfkqm.storm.framework.test.HelloServiceImpl"/>
 <storm:service id="helloServiceRegister"

编写对应的 Handler 和 Parser:


import org.springframework.beans.factory.xml.NamespaceHandlerSupport;

 * @author 孙浩
 * @Descrption 服务发布自定义标签
public class StormServiceNamespaceHandler extends NamespaceHandlerSupport {
    public void init() {
        registerBeanDefinitionParser("service", new ProviderFactoryBeanDefinitionParser());


protected Class getBeanClass(Element element) {
        return ProviderFactoryBean.class;

    protected void doParse(Element element, BeanDefinitionBuilder bean) {

        try {
            String serviceItf = element.getAttribute("interface");
            String serverPort = element.getAttribute("serverPort");
            String ref = element.getAttribute("ref");
            // ....
            bean.addPropertyValue("serverPort", Integer.parseInt(serverPort));
            bean.addPropertyValue("serviceItf", Class.forName(serviceItf));
            bean.addPropertyReference("serviceObject", ref);
            if (NumberUtils.isNumber(weight)) {
                bean.addPropertyValue("weight", Integer.parseInt(weight));
       } catch (Exception e) {
            // ...        


 * @Descrption 服务发布
public class ProviderFactoryBean implements FactoryBean, InitializingBean {

    private Class<?> serviceItf;
    private Object serviceObject;
    private String serverPort;
    private long timeout;
    private Object serviceProxyObject;
    private String appKey;
    private String groupName = "default";
    //服务提供者权重,默认为 1 , 范围为 [1-100]
    private int weight = 1;
    //服务端线程数,默认 10 个线程
    private int workerThreads = 10;

    public Object getObject() throws Exception {
        return serviceProxyObject;

    public Class<?> getObjectType() {
        return serviceItf;

    public void afterPropertiesSet() throws Exception {
        //启动 Netty 服务端
        //注册到 zk, 元数据注册中心
        List<ProviderService> providerServiceList = buildProviderServiceInfos();
        IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton();

public void registerProvider(final List<ProviderService> serviceMetaData) {
    if (CollectionUtils.isEmpty(serviceMetaData)) {

    //连接 zk, 注册服务
    synchronized (RegisterCenter.class) {
        for (ProviderService provider : serviceMetaData) {
            String serviceItfKey = provider.getServiceItf().getName();

            List<ProviderService> providers = providerServiceMap.get(serviceItfKey);
            if (providers == null) {
                providers = Lists.newArrayList();
            providerServiceMap.put(serviceItfKey, providers);

        if (zkClient == null) {
            zkClient = new ZkClient(ZK_SERVICE, ZK_SESSION_TIME_OUT, ZK_CONNECTION_TIME_OUT, new SerializableSerializer());

        //创建 ZK 命名空间/当前部署应用 APP 命名空间/
        String APP_KEY = serviceMetaData.get(0).getAppKey();
        String ZK_PATH = ROOT_PATH + "/" + APP_KEY;
        boolean exist = zkClient.exists(ZK_PATH);
        if (!exist) {
            zkClient.createPersistent(ZK_PATH, true);

        for (Map.Entry<String, List<ProviderService>> entry : providerServiceMap.entrySet()) {
            String groupName = entry.getValue().get(0).getGroupName();
            String serviceNode = entry.getKey();
            String servicePath = ZK_PATH + "/" + groupName + "/" + serviceNode + "/" + PROVIDER_TYPE;
            exist = zkClient.exists(servicePath);
            if (!exist) {
                zkClient.createPersistent(servicePath, true);

            int serverPort = entry.getValue().get(0).getServerPort();//服务端口
            int weight = entry.getValue().get(0).getWeight();//服务权重
            int workerThreads = entry.getValue().get(0).getWorkerThreads();//服务工作线程
            String localIp = IPHelper.localIp();
            String currentServiceIpNode = servicePath + "/" + localIp + "|" + serverPort + "|" + weight + "|" + workerThreads + "|" + groupName;
            exist = zkClient.exists(currentServiceIpNode);
            if (!exist) {
            zkClient.subscribeChildChanges(servicePath, new IZkChildListener() {
                public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                    if (currentChilds == null) {
                        currentChilds = Lists.newArrayList();
                    //存活的服务 IP 列表
                    List<String> activityServiceIpList = Lists.newArrayList(Lists.transform(currentChilds, new Function<String, String>() {
                        public String apply(String input) {
                            return StringUtils.split(input, "|")[0];


至此服务实现类已被载入 Spring 容器中,且服务接口信息也注册到了注册中心。


作为生产者对外提供 RPC 服务,必须有一个网络程序来来监听请求和做出响应。在 Java 领域 Netty 是一款高性能的 NIO 通信框架,很多的框架的通信都是采用 Netty 来实现的,本例中也采用它当做通信服务器。

构建并启动 Netty 服务监听指定端口:

public void start(final int port) {
        synchronized (NettyServer.class) {
            if (bossGroup != null || workerGroup != null) {

            bossGroup = new NioEventLoopGroup();
            workerGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
                    .group(bossGroup, workerGroup)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //注册解码器 NettyDecoderHandler
                            ch.pipeline().addLast(new NettyDecoderHandler(StormRequest.class, serializeType));
                            //注册编码器 NettyEncoderHandler
                            ch.pipeline().addLast(new NettyEncoderHandler(serializeType));
                            //注册服务端业务逻辑处理器 NettyServerInvokeHandler
                            ch.pipeline().addLast(new NettyServerInvokeHandler());
            try {
                channel = serverBootstrap.bind(port).sync().channel();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);

上面的代码中向 Netty 服务的 Pipeline 中添加了编解码和业务处理器,当接收到请求时,经过编解码后,真正处理业务的是业务处理器,即 NettyServerInvokeHandler,该处理器继承自 SimpleChannelInboundHandler,当数据读取完成将触发一个事件,并调用 NettyServerInvokeHandler#channelRead0 方法来处理请求。

protected void channelRead0(ChannelHandlerContext ctx, StormRequest request) throws Exception {
    if (ctx.channel().isWritable()) {
        ProviderService metaDataModel = request.getProviderService();
        long consumeTimeOut = request.getInvokeTimeout();
        final String methodName = request.getInvokedMethodName();

        String serviceKey = metaDataModel.getServiceItf().getName();
        int workerThread = metaDataModel.getWorkerThreads();
        Semaphore semaphore = serviceKeySemaphoreMap.get(serviceKey);
        if (semaphore == null) {
            synchronized (serviceKeySemaphoreMap) {
                semaphore = serviceKeySemaphoreMap.get(serviceKey);
                if (semaphore == null) {
                    semaphore = new Semaphore(workerThread);
                    serviceKeySemaphoreMap.put(serviceKey, semaphore);

        IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton();
        List<ProviderService> localProviderCaches = registerCenter4Provider.getProviderServiceMap().get(serviceKey);

        Object result = null;
        boolean acquire = false;

        try {
            ProviderService localProviderCache = Collections2.filter(localProviderCaches, new Predicate<ProviderService>() {
                public boolean apply(ProviderService input) {
                    return StringUtils.equals(input.getServiceMethod().getName(), methodName);
            Object serviceObject = localProviderCache.getServiceObject();

            Method method = localProviderCache.getServiceMethod();
            //利用 semaphore 实现限流
            acquire = semaphore.tryAcquire(consumeTimeOut, TimeUnit.MILLISECONDS);
            if (acquire) {
                result = method.invoke(serviceObject, request.getArgs());
        } catch (Exception e) {
            System.out.println(JSON.toJSONString(localProviderCaches) + "  " + methodName+" "+e.getMessage());
            result = e;
        } finally {
            if (acquire) {
        StormResponse response = new StormResponse();
    } else {
        logger.error("------------channel closed!---------------");

此处还有部分细节如自定义的编解码器等,篇幅所限不在此详述,继承 MessageToByteEncoder 和 ByteToMessageDecoder 覆写对应的 encode 和 decode 方法即可自定义编解码器,使用到的序列化工具如 Hessian/Proto 等可参考对应的官方文档。


为便于封装请求和响应,定义两个 bean 来表示请求和响应。


 * @author 孙浩
 * @Descrption
public class StormRequest implements Serializable {

    private static final long serialVersionUID = -5196465012408804755L;
    private String uniqueKey;
    private ProviderService providerService;
    private String invokedMethodName;
    private Object[] args;
    private String appName;
    private long invokeTimeout;
    // getter/setter


 * @Descrption
public class StormResponse implements Serializable {
    private static final long serialVersionUID = 5785265307118147202L;
    //UUID, 唯一标识一次返回值
    private String uniqueKey;
    private long invokeTimeout;
    private Object result;


客户端(消费者)在 RPC 调用中主要是生成服务接口的代理对象,并从注册中心获取对应的服务列表发起网络请求。

客户端和服务端一样采用 Spring 来管理 bean 解析 XML 配置等不再赘述,重点看下以下几点:

1、通过 JDK 动态代理来生成引入服务接口的代理对象

public Object getProxy() {
    return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{targetInterface}, this);


String serviceKey = targetInterface.getName();
IRegisterCenter4Invoker registerCenter4Consumer = RegisterCenter.singleton();
List<ProviderService> providerServices = registerCenter4Consumer.getServiceMetaDataMap4Consume().get(serviceKey);
ClusterStrategy clusterStrategyService = ClusterEngine.queryClusterStrategy(clusterStrategy);
ProviderService providerService = clusterStrategyService.select(providerServices);

3、通过 Netty 建立连接,发起网络请求

 * @author 孙浩
 * @Descrption Netty 消费端 bean 代理工厂
public class RevokerProxyBeanFactory implements InvocationHandler {
    private ExecutorService fixedThreadPool = null;
    private Class<?> targetInterface;
    private int consumeTimeout;
    private static int threadWorkerNumber = 10;
    private String clusterStrategy;

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {


        ProviderService newProvider = providerService.copy();

        //声明调用 AresRequest 对象,AresRequest 表示发起一次调用所包含的信息
        final StormRequest request = new StormRequest();
        request.setUniqueKey(UUID.randomUUID().toString() + "-" + Thread.currentThread().getId());

        try {
            if (fixedThreadPool == null) {
                synchronized (RevokerProxyBeanFactory.class) {
                    if (null == fixedThreadPool) {
                        fixedThreadPool = Executors.newFixedThreadPool(threadWorkerNumber);
            //根据服务提供者的 ip,port, 构建 InetSocketAddress 对象,标识服务提供者地址
            String serverIp = request.getProviderService().getServerIp();
            int serverPort = request.getProviderService().getServerPort();
            InetSocketAddress inetSocketAddress = new InetSocketAddress(serverIp, serverPort);
            //提交本次调用信息到线程池 fixedThreadPool, 发起调用
            Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request));
            StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
            if (response != null) {
                return response.getResult();
        } catch (Exception e) {
            throw new RuntimeException(e);
        return null;
    //  ...

Netty 的响应是异步的,为了在方法调用返回前获取到响应结果,需要将异步的结果同步化。

4、Netty 异步返回的结果存入阻塞队列

protected void channelRead0(ChannelHandlerContext channelHandlerContext, StormResponse response) throws Exception {
    //将 Netty 异步返回的结果存入阻塞队列,以便调用端同步获取


//提交本次调用信息到线程池 fixedThreadPool, 发起调用
Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request));
StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
if (response != null) {
    return response.getResult();

//从返回结果容器中获取返回结果,同时设置等待超时时间为 invokeTimeout
long invokeTimeout = request.getInvokeTimeout();
StormResponse response = RevokerResponseHolder.getValue(request.getUniqueKey(), invokeTimeout);



 * @Descrption
public class MainServer {
    public static void main(String[] args) throws Exception {
        final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-server.xml");
        System.out.println(" 服务发布完成");


public class Client {

    private static final Logger logger = LoggerFactory.getLogger(Client.class);

    public static void main(String[] args) throws Exception {

        final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-client.xml");
        final HelloService helloService = (HelloService) context.getBean("helloService");
        String result = helloService.sayHello("World");
        for (;;) {




本文简单介绍了 RPC 的整个流程,并实现了一个简单的 RPC 调用。希望阅读完本文之后,能加深你对 RPC 的一些认识。


  • Load the service interface and cache
  • Service registration, write the service interface and service host information to the registration center (this example uses ZooKeeper)
  • Start the network server and listen to
  • Reflection, call locally

Consumer side process:

  • The proxy service interface generates a proxy object
  • Service discovery (connect to ZooKeeper, get the service address list, and obtain the appropriate service through the client load policy Address)
  • Remote method call (in this example, send a message through Netty and get the response result)

The above is the detailed content of Alibaba interviewer: Please handwrite an RPC framework for me.. For more information, please follow other related articles on the PHP Chinese website!

