検索
ホームページJava&#&チュートリアルJava 開発 RocketMQ プロデューサーの高可用性の例の分析

    1 メッセージ

    public class Message implements Serializable {
        private static final long serialVersionUID = 8445773977080406428L;
        //主题名字
        private String topic;
        //消息扩展信息,Tag,keys,延迟级别都存在这里
        private Map<String, String> properties;
        //消息体,字节数组
        private byte[] body;
        //设置消息的key,
        public void setKeys(String keys) {}
        //设置topic
        public void setTopic(String topic) {}
        //延迟级别
        public int setDelayTimeLevel(int level) {}
        //消息过滤的标记
        public void setTags(String tags) {}
        //扩展信息存放在此
        public void putUserProperty(final String name, final String value) {}
    }

    メッセージは子供たちです。子供たちは独自の特徴を持っていますが、共通点もあります。同じ親から送られた 2 人の子供は、同じ場所に行くことも、別の場所に行くこともできます。

    1.1 topic

    まず、各子メッセージには属性 topic があり、これは上で説明した待機ホールです。子どもたちは入ってきた後、指定された待合室の 指定エリア まで歩き (外出するときも指定のホームから高速鉄道に乗りますよね)、# の席に座ります。 ##メッセージキュー席 待って、旅行を待ちます。

    ブローカーには 1 つ以上のトピックがあり、メッセージはトピック内のメッセージ キューに保存され、消費されるのを待ちます。

    1.2 ボディ

    子供のニュース、

    ボディ属性もあります。これは彼の 能力で、彼は絵を描くことができ、歌うことができ、彼は歌うことができます。何を行っても、Body 属性に記録されます。外出時に値が反映される場所もBody属性です。

    Body はメッセージ本文であり、コンシューマはメッセージ本文に基づいて対応する操作を実行します。

    1.3 タグ

    このタグは前のセクションで説明しました。これは

    マーク です。お絵かきボードやカメラを背中に背負っている子供もいますし、クルーズ船もいます。 特別に見つけてください。子供たちは を引き離し、任務を完了します。

    メッセージのタグ属性を設定でき、コンシューマは特定のタグ属性を含むメッセージを選択して消費できます。

    1.4 キー

    キーは、各子供のメッセージの

    name です。子供を見つけたいなら、名前を呼んでください。

    送信メッセージのキーを設定すると、後でこのキーに基づいてメッセージを検索できます。たとえば、メッセージに異常がある場合やメッセージが失われた場合に、検索するのに非常に便利です。

    1.5 遅延レベル

    もちろん、来てもすぐに帰らない子もいます。よく考えてから来ます。食事には 30 分かかるので、来たら待ってます。30分後に迎えに来ます。

    遅延レベルを設定すると、メッセージを消費できる時間を指定できます。

    2 プロデューサーの高可用性

    子供を送り出す親は皆、子供を待合室に送り届けたいと考えており、子供が

    迷子になることを望んでいません。現時点では、待合室には何らかの保証メカニズムが必要です。 2.1 クライアントはプロデューサーの高可用性を保証します

    2.1.1 再試行メカニズム

    つまり、親が子供を待機ホールに送った後、
    メッセージ キューの座席

    に座れなかった場合、スタッフは空席があるかどうかを確認するために再度挑戦するよう手配します。デフォルトの再試行回数は 2 回です。つまり、メッセージの子には座席を見つける機会が合計 3 回あります。 ソースコードを見て、大まかに理解できるように特別にコメントを追加しました。

    //这里取到了重试的次数
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    int times = 0;
    String[] brokersSent = new String[timesTotal];
    for (; times < timesTotal; times++) {
        String lastBrokerName = null == mq ? null : mq.getBrokerName();
        //获取消息队列
        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
        if (mqSelected != null) {
            mq = mqSelected;
            brokersSent[times] = mq.getBrokerName();
            try {
                beginTimestampPrev = System.currentTimeMillis();
                if (times > 0) {
                    //Reset topic with namespace during resend.
                    msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                }
                long costTime = beginTimestampPrev - beginTimestampFirst;
                if (timeout < costTime) {
                    callTimeout = true;
                    break;
                }
                //发送消息
                sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                ...
            } catch (RemotingException e) {
                ...
                continue;
            } catch (MQClientException e) {
                ...
                continue;
            } catch (MQBrokerException e) {
                ...
                continue;
            } catch (InterruptedException e) {
                //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试
                throw e;
            }
        } else {
            break;
        }
    }

    再試行コードは上記のとおりです。この

    sendDefaultImpl

    メソッドでは、メッセージの送信を 3 回試行します。失敗すると、対応するエラーがスローされます。 2.1.2 クライアントのフォールト トレランス

    ブローカーの待機ホールが複数ある場合、サービス スタッフは、
    比較的混雑の少ないホールを選択するよう子供にメッセージを送信します。 #comparison 入力するのは簡単です

    を入力します。もちろん、 が閉鎖され、 が停電し、 がサービス機能を持たない は入力されません。 MQ クライアントは、ブローカーの送信遅延情報を維持し、この情報に基づいて、メッセージを送信する比較的遅延の少ないブローカーを選択します。ダウンしている、利用できない、または送信遅延レベルが高いブローカーを積極的に排除します。 Broker を選択すると、

    message queue

    が選択されます。対応するコードは次のとおりです。

    ここでは、まず 遅延フォールト トレランス スイッチがオンになっているかどうかを判断します。このスイッチは デフォルトではオフになっています。オンになっている場合は、

    より低い遅延##が最初に選択されます。#ブローカー。

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        //判断发送延迟容错开关是否开启
        if (this.sendLatencyFaultEnable) {
            try {
                //选择一个延迟上可以接受,并且和上次发送相同的Broker
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    //若是Broker的延迟时间可以接受,则返回这个Broker
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            //若是前边都没选中一个Broker,就随机选一个Broker
            return tpInfo.selectOneMessageQueue();
        }
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
    ただし、遅延フォールト トレランス スイッチオフ状態の場合、実行されるコードは次のとおりです。

    圧力を均等に分散するためにブローカーでは、以前とは

    A 異なるブローカー が選択されます。

    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        //若是没有上次的Brokername做参考,就随机选一个
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            //如果有,那么就选一个其他的Broker
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                //这里判断遇上一个使用的Broker不是同一个
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            //若是上边的都没选中,那么就随机选一个
            return selectOneMessageQueue();
        }
    }
    2.2 ブローカー側はプロデューサーの高可用性を確保します

    ブローカー待機ホールで

    メッセージの子を正確に受信するために、少なくとも 2 つのホールがあり、1 つはホールになります。 メインホール

    副ホール

    があります。通常、子供たちは

    メインホール

    に入り、しばらくするとカードが機械を信じてビジー状態になるはずです(シャドウクローンの技術) そして、クローンを副ホールに入らせて、メインホールが停電して機能しなくなっても、副ホールのクローンはタスクを完了する限り大丈夫です。一般的には、子供が仕事を完了するために船に乗りに行くという本殿からのメッセージです。

    以上がJava 開発 RocketMQ プロデューサーの高可用性の例の分析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明
    この記事は亿速云で複製されています。侵害がある場合は、admin@php.cn までご連絡ください。
    高度なJavaプロジェクト管理、自動化の構築、依存関係の解像度にMavenまたはGradleを使用するにはどうすればよいですか?高度なJavaプロジェクト管理、自動化の構築、依存関係の解像度にMavenまたはGradleを使用するにはどうすればよいですか?Mar 17, 2025 pm 05:46 PM

    この記事では、Javaプロジェクト管理、自動化の構築、依存関係の解像度にMavenとGradleを使用して、アプローチと最適化戦略を比較して説明します。

    適切なバージョン化と依存関係管理を備えたカスタムJavaライブラリ(JARファイル)を作成および使用するにはどうすればよいですか?適切なバージョン化と依存関係管理を備えたカスタムJavaライブラリ(JARファイル)を作成および使用するにはどうすればよいですか?Mar 17, 2025 pm 05:45 PM

    この記事では、MavenやGradleなどのツールを使用して、適切なバージョン化と依存関係管理を使用して、カスタムJavaライブラリ(JARファイル)の作成と使用について説明します。

    カフェインやグアバキャッシュなどのライブラリを使用して、Javaアプリケーションにマルチレベルキャッシュを実装するにはどうすればよいですか?カフェインやグアバキャッシュなどのライブラリを使用して、Javaアプリケーションにマルチレベルキャッシュを実装するにはどうすればよいですか?Mar 17, 2025 pm 05:44 PM

    この記事では、カフェインとグアバキャッシュを使用してJavaでマルチレベルキャッシュを実装してアプリケーションのパフォーマンスを向上させています。セットアップ、統合、パフォーマンスの利点をカバーし、構成と立ち退きポリシー管理Best Pra

    キャッシュや怠zyなロードなどの高度な機能を備えたオブジェクトリレーショナルマッピングにJPA(Java Persistence API)を使用するにはどうすればよいですか?キャッシュや怠zyなロードなどの高度な機能を備えたオブジェクトリレーショナルマッピングにJPA(Java Persistence API)を使用するにはどうすればよいですか?Mar 17, 2025 pm 05:43 PM

    この記事では、キャッシュや怠zyなロードなどの高度な機能を備えたオブジェクトリレーショナルマッピングにJPAを使用することについて説明します。潜在的な落とし穴を強調しながら、パフォーマンスを最適化するためのセットアップ、エンティティマッピング、およびベストプラクティスをカバーしています。[159文字]

    Javaのクラスロードメカニズムは、さまざまなクラスローダーやその委任モデルを含むどのように機能しますか?Javaのクラスロードメカニズムは、さまざまなクラスローダーやその委任モデルを含むどのように機能しますか?Mar 17, 2025 pm 05:35 PM

    Javaのクラスロードには、ブートストラップ、拡張機能、およびアプリケーションクラスローダーを備えた階層システムを使用して、クラスの読み込み、リンク、および初期化が含まれます。親の委任モデルは、コアクラスが最初にロードされ、カスタムクラスのLOAに影響を与えることを保証します

    See all articles

    ホットAIツール

    Undresser.AI Undress

    Undresser.AI Undress

    リアルなヌード写真を作成する AI 搭載アプリ

    AI Clothes Remover

    AI Clothes Remover

    写真から衣服を削除するオンライン AI ツール。

    Undress AI Tool

    Undress AI Tool

    脱衣画像を無料で

    Clothoff.io

    Clothoff.io

    AI衣類リムーバー

    AI Hentai Generator

    AI Hentai Generator

    AIヘンタイを無料で生成します。

    ホットツール

    VSCode Windows 64 ビットのダウンロード

    VSCode Windows 64 ビットのダウンロード

    Microsoft によって発売された無料で強力な IDE エディター

    DVWA

    DVWA

    Damn Vulnerable Web App (DVWA) は、非常に脆弱な PHP/MySQL Web アプリケーションです。その主な目的は、セキュリティ専門家が法的環境でスキルとツールをテストするのに役立ち、Web 開発者が Web アプリケーションを保護するプロセスをより深く理解できるようにし、教師/生徒が教室環境で Web アプリケーションを教え/学習できるようにすることです。安全。 DVWA の目標は、シンプルでわかりやすいインターフェイスを通じて、さまざまな難易度で最も一般的な Web 脆弱性のいくつかを実践することです。このソフトウェアは、

    SublimeText3 Linux 新バージョン

    SublimeText3 Linux 新バージョン

    SublimeText3 Linux 最新バージョン

    ドリームウィーバー CS6

    ドリームウィーバー CS6

    ビジュアル Web 開発ツール

    MantisBT

    MantisBT

    Mantis は、製品の欠陥追跡を支援するために設計された、導入が簡単な Web ベースの欠陥追跡ツールです。 PHP、MySQL、Web サーバーが必要です。デモおよびホスティング サービスをチェックしてください。