検索
ホームページJava&#&チュートリアルソファ RPC サーバーのソース コードの詳細な分析 (フローチャート付き)

この記事は、sofa-rpc サーバーのソース コードを詳細に分析したものです (フローチャートが添付されています)。必要な方は参考にしていただければ幸いです。

sofa-rpc は、Alibaba によってオープンソース化されている高パフォーマンスの RPC フレームワークです。この記事では主に、sofa-rpc プロバイダーの起動サービス プロセスのコードを説明します。以下は、私が単純に描いた基本的な関係プロセスです。図

以下では、sofa-rpc コードに基づいてプロセスを追跡し、読み取ります。例として、BoltServer を取り上げます。

    public static void main(String[] args) {
        ApplicationConfig application = new ApplicationConfig().setAppName("test-server");

        ServerConfig serverConfig = new ServerConfig()
            .setPort(22000)
            .setDaemon(false);

        ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
            .setInterfaceId(HelloService.class.getName())
            .setApplication(application)
            .setRef(new HelloServiceImpl())
            .setServer(serverConfig)
            .setRegister(false);

        ProviderConfig<EchoService> providerConfig2 = new ProviderConfig<EchoService>()
            .setInterfaceId(EchoService.class.getName())
            .setApplication(application)
            .setRef(new EchoServiceImpl())
            .setServer(serverConfig)
            .setRegister(false);

        providerConfig.export();
        providerConfig2.export();

        LOGGER.warn("started at pid {}", RpcRuntimeContext.PID);
    }

sofa-rpc が ProviderConfig クラスを通じてサービス プロバイダー Provider の構成情報を初期化し、エクスポートも提供していることがわかります。サービス起動のエントリポイントとして機能します。

    public synchronized void export() {        
    if (providerBootstrap == null) {
            providerBootstrap = Bootstraps.from(this);
        }
        providerBootstrap.export();
    }

<span style="font-size: 14px; font-family: " microsoft yahei>根据ProviderConfig中setBootstrap()配置的Bootstrap类型,我们通过Bootstaps.from(this)</span><span style="font-size: 14px; font-family: " microsoft yahei>可以获取到不同的Bootstrap引导服务,分别是DefaultProviderBootstrap与 DubboProviderBootstrap <br></span>

/**
     * 发布一个服务
     *
     * @param providerConfig 服务发布者配置
     * @param <T>            接口类型
     * @return 发布启动类
     */
    public static <T> ProviderBootstrap<T> from(ProviderConfig<T> providerConfig) {
        String bootstrap = providerConfig.getBootstrap();
        if (StringUtils.isEmpty(bootstrap)) {
            // Use default provider bootstrap 无的话就返回默认DefaultProviderBootstrap
            bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP);
            providerConfig.setBootstrap(bootstrap);
        }
        ProviderBootstrap providerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ProviderBootstrap.class)
            .getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig });
        return (ProviderBootstrap<T>) providerBootstrap;
    }

DefaultProviderBootstrap と DubboProviderBootstrap は両方とも ProviderBootstrap を継承します。

DefaultProviderBootstrap は、BoltProviderBootstrap、Http2ClearTextProviderBootstrap、および RestProviderBootstrap の 3 つのクラスによって継承されます。これは、実際には、sofa-rpc の 3 つのサーバー サービス メソッドに対応します。

DefaultProviderBootstrap サービスのスタートアップ ソース コードを見てみましょう

  @Override
    public void export() {
        if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
            Thread thread = factory.newThread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(providerConfig.getDelay());
                    } catch (Throwable ignore) { // NOPMD
                    }
                    doExport();
                }
            });
            thread.start();
        } else {
            doExport();
        }
    }

    private void doExport() {
        if (exported) {
            return;
        }

        // 检查参数
        checkParameters();

        String appName = providerConfig.getAppName();

        //key  is the protocol of server,for concurrent safe
        Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>();
        // 将处理器注册到server
        List<ServerConfig> serverConfigs = providerConfig.getServer();
        for (ServerConfig serverConfig : serverConfigs) {
            String protocol = serverConfig.getProtocol();

            String key = providerConfig.buildKey() + ":" + protocol;

            if (LOGGER.isInfoEnabled(appName)) {
                LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
            }

            // 注意同一interface,同一uniqleId,不同server情况
            AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
            if (cnt == null) { // 没有发布过
                cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
            }
            int c = cnt.incrementAndGet();
            hasExportedInCurrent.put(serverConfig.getProtocol(), true);
            int maxProxyCount = providerConfig.getRepeatedExportLimit();
            if (maxProxyCount > 0) {
                if (c > maxProxyCount) {
                    decrementCounter(hasExportedInCurrent);
                    // 超过最大数量,直接抛出异常
                    throw new SofaRpcRuntimeException("Duplicate provider config with key " + key
                        + " has been exported more than " + maxProxyCount + " times!"
                        + " Maybe it&#39;s wrong config, please check it."
                        + " Ignore this if you did that on purpose!");
                } else if (c > 1) {
                    if (LOGGER.isInfoEnabled(appName)) {
                        LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!"
                            + " Maybe it&#39;s wrong config, please check it."
                            + " Ignore this if you did that on purpose!", key);
                    }
                }
            }

        }

        try {
            // 构造请求调用器
            providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
            // 初始化注册中心
            if (providerConfig.isRegister()) {
                List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
                if (CommonUtils.isNotEmpty(registryConfigs)) {
                    for (RegistryConfig registryConfig : registryConfigs) {
                        RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
                    }
                }
            }
            // 将处理器注册到server
            for (ServerConfig serverConfig : serverConfigs) {
                try {
                    //构建Server
                    Server server = serverConfig.buildIfAbsent();
                    // 注册序列化接口
                    server.registerProcessor(providerConfig, providerProxyInvoker);
                    if (serverConfig.isAutoStart()) {
                        //启动服务
                        server.start();
                    }

                } catch (SofaRpcRuntimeException e) {
                    throw e;
                } catch (Exception e) {
                    LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
                        + serverConfig.getId(), e);
                }
            }

            // 注册到注册中心
            providerConfig.setConfigListener(new ProviderAttributeListener());
            register();
        } catch (Exception e) {
            decrementCounter(hasExportedInCurrent);

            if (e instanceof SofaRpcRuntimeException) {
                throw (SofaRpcRuntimeException) e;
            } else {
                throw new SofaRpcRuntimeException("Build provider proxy error!", e);
            }
        }

        // 记录一些缓存数据
        RpcRuntimeContext.cacheProviderConfig(this);
        exported = true;
    }

コードでは、Server サービス オブジェクトは、serverConfig.buildIfAbsent によって構築されます。 ()、および buildIfAbsent() 関数では、サーバーが SeverFactory ファクトリを通じて取得されることがわかります。SeverFactory の getSever() では、SeverConfig および Init() の構成に従ってサーバーの特定のインスタンスが取得されます。初期化のために実行されます。

    /**
     * 启动服务
     *
     * @return the server
     */
    public synchronized Server buildIfAbsent() {
        if (server != null) {
            return server;
        }
        // 提前检查协议+序列化方式
        // ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()),
        //                SerializationType.valueOf(getSerialization()));
        
        //在sever工厂中拿到sever实例
        server = ServerFactory.getServer(this);
        return server;
    }

/**
     * 初始化Server实例
     *
     * @param serverConfig 服务端配置
     * @return Server
     */
    public synchronized static Server getServer(ServerConfig serverConfig) {
        try {
            Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort()));
            if (server == null) {
                // 算下网卡和端口
                resolveServerConfig(serverConfig);

                ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class)
                    .getExtensionClass(serverConfig.getProtocol());
                if (ext == null) {
                    throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(),
                        "Unsupported protocol of server!");
                }
                server = ext.getExtInstance();
                //服务初始化
                server.init(serverConfig);
                SERVER_MAP.put(serverConfig.getPort() + "", server);
            }
            return server;
        } catch (SofaRpcRuntimeException e) {
            throw e;
        } catch (Throwable e) {
            throw new SofaRpcRuntimeException(e.getMessage(), e);
        }
    }

sofa-rpc は、BoltServer、RestServer、および AbstractHttpServer の 3 つのサーバー タイプを提供します

BoltServer の通信の最下層は、Alibaba のソファボルト通信フレームワークに基づいて開発された RemotingServer を通じて実装されます。

  /**
     * Bolt服务端
     */
    protected RemotingServer       remotingServer;

   @Override
    public void start() {
        if (started) {
            return;
        }
        synchronized (this) {
            if (started) {
                return;
            }
            // 生成阿里基于netty的bolt服务Server对象 
            remotingServer = initRemotingServer();
            try {
                if (remotingServer.start(serverConfig.getBoundHost())) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(),
                            serverConfig.getPort());
                    }
                } else {
                    throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log.");
                }
                started = true;

                if (EventBus.isEnable(ServerStartedEvent.class)) {
                    EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
                }

            } catch (SofaRpcRuntimeException e) {
                throw e;
            } catch (Exception e) {
                throw new SofaRpcRuntimeException("Failed to start bolt server!", e);
            }
        }
    }

AbstractHttpServer は http サービスを提供し、基礎となる通信は ServerTransport クラスを通じて実装されます

 /**
     * 服务端通讯层
     */
    private ServerTransport         serverTransport;

    @Override
    public void init(ServerConfig serverConfig) {
        this.serverConfig = serverConfig;
        this.serverTransportConfig = convertConfig(serverConfig);
        // 启动线程池
        this.bizThreadPool = initThreadPool(serverConfig);
        // 服务端处理器
        this.serverHandler = new HttpServerHandler();

        // set default transport config
        this.serverTransportConfig.setContainer(container);
        this.serverTransportConfig.setServerHandler(serverHandler);
    }

    @Override
    public void start() {
        if (started) {
            return;
        }
        synchronized (this) {
            if (started) {
                return;
            }
            try {
                // 启动线程池
                this.bizThreadPool = initThreadPool(serverConfig);
                this.serverHandler.setBizThreadPool(bizThreadPool);
                //实例化服务,具体代码见
                serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig);
                started = serverTransport.start();

                if (started) {
                    if (EventBus.isEnable(ServerStartedEvent.class)) {
                        EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
                    }
                }
            } catch (SofaRpcRuntimeException e) {
                throw e;
            } catch (Exception e) {
                throw new SofaRpcRuntimeException("Failed to start HTTP/2 server!", e);
            }
        }
    }

ServerTransport は抽象クラスです。具体的な実装は、Rest サービスを提供するトランスポート パッケージの下の AbstractHttp2ServerTransport です。 SofaNettyJaxrsServer 内。

  /**
     * 构造函数
     *
     * @param transportConfig 服务端配置
     */
    protected AbstractHttp2ServerTransport(ServerTransportConfig transportConfig) {
        super(transportConfig);
    }

 @Override
    public boolean start() {
        if (serverBootstrap != null) {
            return true;
        }
        synchronized (this) {
            if (serverBootstrap != null) {
                return true;
            }
            boolean flag = false;
            SslContext sslCtx = SslContextBuilder.build();

            // Configure the server.
            EventLoopGroup bossGroup = NettyHelper.getServerBossEventLoopGroup(transportConfig);

            //可以看到然是基于Netty
            HttpServerHandler httpServerHandler = (HttpServerHandler) transportConfig.getServerHandler();
            bizGroup = NettyHelper.getServerBizEventLoopGroup(transportConfig, httpServerHandler.getBizThreadPool());

            serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, bizGroup)
                .channel(transportConfig.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, transportConfig.getBacklog())
                .option(ChannelOption.SO_REUSEADDR, transportConfig.isReuseAddr())
                .option(ChannelOption.RCVBUF_ALLOCATOR, NettyHelper.getRecvByteBufAllocator())
                .option(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
                .childOption(ChannelOption.SO_KEEPALIVE, transportConfig.isKeepAlive())
                .childOption(ChannelOption.TCP_NODELAY, transportConfig.isTcpNoDelay())
                .childOption(ChannelOption.SO_RCVBUF, 8192 * 128)
                .childOption(ChannelOption.SO_SNDBUF, 8192 * 128)
                .handler(new LoggingHandler(LogLevel.DEBUG))
                .childOption(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                    transportConfig.getBufferMin(), transportConfig.getBufferMax()))
                .childHandler(new Http2ServerChannelInitializer(bizGroup, sslCtx,
                    httpServerHandler, transportConfig.getPayload()));

            // 绑定到全部网卡 或者 指定网卡
            ChannelFuture future = serverBootstrap.bind(
                new InetSocketAddress(transportConfig.getHost(), transportConfig.getPort()));
            ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("HTTP/2 Server bind to {}:{} success!",
                                transportConfig.getHost(), transportConfig.getPort());
                        }
                    } else {
                        LOGGER.error("HTTP/2 Server bind to {}:{} failed!",
                            transportConfig.getHost(), transportConfig.getPort());
                        stop();
                    }
                }
            });

            try {
                channelFuture.await();
                if (channelFuture.isSuccess()) {
                    flag = Boolean.TRUE;
                } else {
                    throw new SofaRpcRuntimeException("Server start fail!", future.cause());
                }
            } catch (InterruptedException e) {
                LOGGER.error(e.getMessage(), e);
            }
            return flag;
        }
    }

SofaNettyJaxrsServer でサービスを開始するための特定のコード

  /**
     * Rest服务端
     */
    protected SofaNettyJaxrsServer httpServer;

    @Override
    public void init(ServerConfig serverConfig) {
        this.serverConfig = serverConfig;
        httpServer = buildServer();
    }

OK、上記はソファ - RPC サーバーの起動の基本プロセスです。ここでは、コード機能の詳細な分析は行わず、単純なサービスの起動プロセスのみに焦点を当てます。これに基づいて、コードの具体的な実装をさらに検討できます。

以上がソファ RPC サーバーのソース コードの詳細な分析 (フローチャート付き)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明
この記事は博客园で複製されています。侵害がある場合は、admin@php.cn までご連絡ください。
高度なJavaプロジェクト管理、自動化の構築、依存関係の解像度にMavenまたはGradleを使用するにはどうすればよいですか?高度なJavaプロジェクト管理、自動化の構築、依存関係の解像度にMavenまたはGradleを使用するにはどうすればよいですか?Mar 17, 2025 pm 05:46 PM

この記事では、Javaプロジェクト管理、自動化の構築、依存関係の解像度にMavenとGradleを使用して、アプローチと最適化戦略を比較して説明します。

適切なバージョン化と依存関係管理を備えたカスタムJavaライブラリ(JARファイル)を作成および使用するにはどうすればよいですか?適切なバージョン化と依存関係管理を備えたカスタムJavaライブラリ(JARファイル)を作成および使用するにはどうすればよいですか?Mar 17, 2025 pm 05:45 PM

この記事では、MavenやGradleなどのツールを使用して、適切なバージョン化と依存関係管理を使用して、カスタムJavaライブラリ(JARファイル)の作成と使用について説明します。

カフェインやグアバキャッシュなどのライブラリを使用して、Javaアプリケーションにマルチレベルキャッシュを実装するにはどうすればよいですか?カフェインやグアバキャッシュなどのライブラリを使用して、Javaアプリケーションにマルチレベルキャッシュを実装するにはどうすればよいですか?Mar 17, 2025 pm 05:44 PM

この記事では、カフェインとグアバキャッシュを使用してJavaでマルチレベルキャッシュを実装してアプリケーションのパフォーマンスを向上させています。セットアップ、統合、パフォーマンスの利点をカバーし、構成と立ち退きポリシー管理Best Pra

キャッシュや怠zyなロードなどの高度な機能を備えたオブジェクトリレーショナルマッピングにJPA(Java Persistence API)を使用するにはどうすればよいですか?キャッシュや怠zyなロードなどの高度な機能を備えたオブジェクトリレーショナルマッピングにJPA(Java Persistence API)を使用するにはどうすればよいですか?Mar 17, 2025 pm 05:43 PM

この記事では、キャッシュや怠zyなロードなどの高度な機能を備えたオブジェクトリレーショナルマッピングにJPAを使用することについて説明します。潜在的な落とし穴を強調しながら、パフォーマンスを最適化するためのセットアップ、エンティティマッピング、およびベストプラクティスをカバーしています。[159文字]

Javaのクラスロードメカニズムは、さまざまなクラスローダーやその委任モデルを含むどのように機能しますか?Javaのクラスロードメカニズムは、さまざまなクラスローダーやその委任モデルを含むどのように機能しますか?Mar 17, 2025 pm 05:35 PM

Javaのクラスロードには、ブートストラップ、拡張機能、およびアプリケーションクラスローダーを備えた階層システムを使用して、クラスの読み込み、リンク、および初期化が含まれます。親の委任モデルは、コアクラスが最初にロードされ、カスタムクラスのLOAに影響を与えることを保証します

分散コンピューティングにJavaのRMI(リモートメソッドの呼び出し)を使用するにはどうすればよいですか?分散コンピューティングにJavaのRMI(リモートメソッドの呼び出し)を使用するにはどうすればよいですか?Mar 11, 2025 pm 05:53 PM

この記事では、分散アプリケーションを構築するためのJavaのリモートメソッドの呼び出し(RMI)について説明します。 インターフェイスの定義、実装、レジストリのセットアップ、およびクライアント側の呼び出しを詳述し、ネットワークの問題やセキュリティなどの課題に対処します。

ネットワーク通信にJavaのソケットAPIを使用するにはどうすればよいですか?ネットワーク通信にJavaのソケットAPIを使用するにはどうすればよいですか?Mar 11, 2025 pm 05:53 PM

この記事では、ネットワーク通信のためのJavaのソケットAPI、クライアントサーバーのセットアップ、データ処理、リソース管理、エラー処理、セキュリティなどの重要な考慮事項をカバーしています。 また、パフォーマンスの最適化手法も調査します

Javaでカスタムネットワークプロトコルを作成するにはどうすればよいですか?Javaでカスタムネットワークプロトコルを作成するにはどうすればよいですか?Mar 11, 2025 pm 05:52 PM

この記事では、カスタムJavaネットワーキングプロトコルの作成を詳述しています。 プロトコルの定義(データ構造、フレーミング、エラー処理、バージョン化)、実装(ソケットを使用)、データシリアル化、およびベストプラクティス(効率、セキュリティ、メンテナ

See all articles

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

Safe Exam Browser

Safe Exam Browser

Safe Exam Browser は、オンライン試験を安全に受験するための安全なブラウザ環境です。このソフトウェアは、あらゆるコンピュータを安全なワークステーションに変えます。あらゆるユーティリティへのアクセスを制御し、学生が無許可のリソースを使用するのを防ぎます。

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

AtomエディタMac版ダウンロード

AtomエディタMac版ダウンロード

最も人気のあるオープンソースエディター

SublimeText3 英語版

SublimeText3 英語版

推奨: Win バージョン、コードプロンプトをサポート!

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター