首頁 >Java >java教程 >分析Java開發RocketMQ生產者高可用範例

分析Java開發RocketMQ生產者高可用範例

WBOY
WBOY轉載
2023-04-23 23:28:061060瀏覽

    1 訊息

    public class Message implements Serializable {
        private static final long serialVersionUID = 8445773977080406428L;
        //主题名字
        private String topic;
        //消息扩展信息,Tag,keys,延迟级别都存在这里
        private Map<String, String> properties;
        //消息体,字节数组
        private byte[] body;
        //设置消息的key,
        public void setKeys(String keys) {}
        //设置topic
        public void setTopic(String topic) {}
        //延迟级别
        public int setDelayTimeLevel(int level) {}
        //消息过滤的标记
        public void setTags(String tags) {}
        //扩展信息存放在此
        public void putUserProperty(final String name, final String value) {}
    }

    訊息就是孩子們,這些孩子呢,有各自的特點,也有共通點。同一個家長送來的兩個孩子可以是去同一個地方的,也可以是去不同的地方的。

    1.1 topic

    首先呢,每個孩子訊息都有一個屬性topic,這個我們上文說到了,是一個候船大廳。孩子們進來之後,走到自己指定的候船大廳的指定區域(平時出門坐火車高鐵不也是指定的站台乘車麼),坐到message queue座位上等,等著出遊。

    Broker有一個或多個topic,訊息會存放到topic內的message queue內,等待被消費。

    1.2 Body

    孩子訊息,也有一個Body屬性,這就是他的能力,他會畫畫,他會唱歌,他會幹啥幹啥,就記錄在這個Body屬性裡。等走出去了,體現價值的地方也是這個Body屬性。

    Body就是訊息體,消費者會根據訊息體執行對應的操作。

    1.3 tag

    這個tag我們上節說了,就是一個標記,有的孩子背著畫板,相機,有的遊船就特意找到這些孩子拉走,完成他們的任務。

    可以為訊息設定tag屬性,消費者可以選擇含有特定tag屬性的訊息進行消費。

    1.4 key

    key就是每個孩子訊息的名字了。要找哪個孩子,喊他名就好。

    對傳送的訊息設定好 Key,以後可以根據這個Key 來找訊息。例如訊息異常,訊息遺失,進行查找會很方便。

    1.5 延遲等級

    當然,還有的孩子來就不急著走,來之前就想好了,要恰個飯,得30分鐘,所以自己來了會等30分鐘後被接走。

    設定延遲等級可以規定多久後訊息可以被消費。

    2 生產者高可用

    每個送孩子來的家長都希望能送到候船大廳裡,更不希望孩子被搞丟了,這個時候這個候船大廳就需要一些保證機制了。

    2.1 客戶端保證生產者高可用

    2.1.1 重試機制

    就是說家長送來了,孩子進到候船大廳之後,沒能成功坐到message queue座位上,這個時候工作人員會安排重試,再去看是否有座位坐。重試次數預設是2次,也就是說,訊息孩子共有3次找座位坐的機會。

    看源碼,我特意加了註解,大致可以看懂一些了。

    //这里取到了重试的次数
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    int times = 0;
    String[] brokersSent = new String[timesTotal];
    for (; times < timesTotal; times++) {
        String lastBrokerName = null == mq ? null : mq.getBrokerName();
        //获取消息队列
        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
        if (mqSelected != null) {
            mq = mqSelected;
            brokersSent[times] = mq.getBrokerName();
            try {
                beginTimestampPrev = System.currentTimeMillis();
                if (times > 0) {
                    //Reset topic with namespace during resend.
                    msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                }
                long costTime = beginTimestampPrev - beginTimestampFirst;
                if (timeout < costTime) {
                    callTimeout = true;
                    break;
                }
                //发送消息
                sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                ...
            } catch (RemotingException e) {
                ...
                continue;
            } catch (MQClientException e) {
                ...
                continue;
            } catch (MQBrokerException e) {
                ...
                continue;
            } catch (InterruptedException e) {
                //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试
                throw e;
            }
        } else {
            break;
        }
    }

    重試程式碼如上,這個sendDefaultImpl方法中,會嘗試傳送三次訊息,若是都失敗,才會拋出對應的錯誤。

    2.1.2 客戶端容錯

    若是有多個Broker候車大廳的時候,服務人員會安排訊息孩子選擇一個相對不擁擠比較容易進入的來進入。當然那些已經關閉的停電的沒有服務能力的,我們是不會進的。

    MQ Client會維護一個Broker的發送延遲訊息,根據這個訊息會選擇一個相對延遲較低的Broker來傳送訊息。會主動剔除哪些已經宕機,不可用或發送延遲等級較高的Broker.

    選擇Broker就是在選擇message queue,對應的程式碼如下:

    這裡會先判斷延遲容錯開關是否開啟,這個開關預設是關閉的,若是開啟的話,會優先選擇延遲較低的Broker。

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        //判断发送延迟容错开关是否开启
        if (this.sendLatencyFaultEnable) {
            try {
                //选择一个延迟上可以接受,并且和上次发送相同的Broker
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    //若是Broker的延迟时间可以接受,则返回这个Broker
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            //若是前边都没选中一个Broker,就随机选一个Broker
            return tpInfo.selectOneMessageQueue();
        }
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

    但是當延遲容錯開關關閉狀態的時候,執行的程式碼如下:

    為了均勻分散Broker的壓力,會選擇與之前不同的Broker

    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        //若是没有上次的Brokername做参考,就随机选一个
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            //如果有,那么就选一个其他的Broker
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                //这里判断遇上一个使用的Broker不是同一个
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            //若是上边的都没选中,那么就随机选一个
            return selectOneMessageQueue();
        }
    }

    2.2 Broker端保證生產者高可用

    Broker候船大廳為了能確切的接收到訊息孩子,至少會有兩個廳,一個主廳一個副廳,一般來說孩子都會進入到主廳,然後一頓操作,卡該忙信那機資(影分身之術) ,然後讓分身進入到副廳,這樣當主廳停電了,不工作了,副廳的分身只要去完成了任務就ok的。一般來說都是主廳的消息孩子去坐船完成任務。

    以上是分析Java開發RocketMQ生產者高可用範例的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    陳述:
    本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除