Maison  >  Article  >  Java  >  Analyse du développement Java Exemple de haute disponibilité du producteur RocketMQ

Analyse du développement Java Exemple de haute disponibilité du producteur RocketMQ

WBOY
WBOYavant
2023-04-23 23:28:06963parcourir

    1 Message

    public class Message implements Serializable {
        private static final long serialVersionUID = 8445773977080406428L;
        //主题名字
        private String topic;
        //消息扩展信息,Tag,keys,延迟级别都存在这里
        private Map<String, String> properties;
        //消息体,字节数组
        private byte[] body;
        //设置消息的key,
        public void setKeys(String keys) {}
        //设置topic
        public void setTopic(String topic) {}
        //延迟级别
        public int setDelayTimeLevel(int level) {}
        //消息过滤的标记
        public void setTags(String tags) {}
        //扩展信息存放在此
        public void putUserProperty(final String name, final String value) {}
    }

    Le message, ce sont les enfants. Ces enfants ont leurs propres caractéristiques mais ont aussi des points communs. Deux enfants envoyés par le même parent peuvent aller au même endroit, ou bien ils peuvent aller dans des endroits différents.

    1.1 topic

    Tout d'abord, chaque message enfant a un attribut topic, dont nous avons parlé plus haut, c'est une salle d'attente. Une fois que les enfants sont entrés, ils ont marché jusqu'à la zone désignée de leur salle d'attente désignée (ne prennent-ils pas habituellement le train et le train à grande vitesse sur le quai désigné ?), se sont assis dans la file d'attente des messagessièges, et attendit leur voyage.

    Le courtier a un ou plusieurs sujets, et les messages seront stockés dans la file d'attente des messages du sujet, en attente d'être consommés.

    1.2 Corps

    Child News a également un attribut Corps, qui est sa capacité Il peut dessiner, il peut chanter et il peut faire ce qu'il veut, qui est enregistré dans cet attribut Corps. Lorsque vous sortez, l'endroit où la valeur se reflète est également l'attribut Corps.

    Body est le corps du message et les consommateurs effectueront les opérations correspondantes en fonction du corps du message.

    Étiquette 1.3

    Nous avons mentionné cette étiquette dans la section précédente, c'est une marque Certains enfants portent des planches à dessin et des appareils photo, et certains bateaux de croisière trouveront spécialement ces enfants et les emmèneront pour accomplir leurs tâches.

    Vous pouvez définir des attributs de balise pour les messages et les consommateurs peuvent choisir des messages contenant des attributs de balise spécifiques à consommer.

    1.4 key

    key est le nom du message de chaque enfant. Si vous voulez retrouver un enfant, appelez-le simplement par son nom.

    Définissez la clé pour le message envoyé et vous pourrez rechercher des messages basés sur cette clé à l'avenir. Par exemple, si le message est anormal ou si le message est perdu, il sera très pratique de le rechercher.

    Niveau de délai 1,5

    Bien sûr, certains enfants ne sont pas pressés de partir. Ils y ont réfléchi avant de venir. Il leur faudra 30 minutes pour prendre un repas, ils attendront donc 30 minutes avant d'être récupérés.

    Définissez le niveau de délai pour spécifier la durée pendant laquelle le message peut être consommé.

    2 Producteur Haute Disponibilité

    Chaque parent qui envoie ses enfants veut envoyer ses enfants dans la salle d'attente, et ils ne veulent pas que leurs enfants soient perdus En ce moment, la salle d'attente a besoin de mécanismes de garantie.

    2.1 Le client assure la haute disponibilité du producteur

    2.1.1 Mécanisme de nouvelle tentative

    C'est-à-dire qu'après que le parent a envoyé l'enfant, après être entré dans la salle d'attente, l'enfant n'a pas réussi à s'asseoir dans la file d'attente des messages , et cela fonctionnait à ce moment-là. Le personnel fera un nouvel essai pour voir s'il y a des places disponibles. Le nombre de tentatives par défaut est de 2 fois, ce qui signifie que l'enfant qui reçoit le message a un total de 3 occasions de trouver une place.

    Regardez le code source, j'ai spécialement ajouté des commentaires, pour que vous puissiez le comprendre à peu près.

    //这里取到了重试的次数
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    int times = 0;
    String[] brokersSent = new String[timesTotal];
    for (; times < timesTotal; times++) {
        String lastBrokerName = null == mq ? null : mq.getBrokerName();
        //获取消息队列
        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
        if (mqSelected != null) {
            mq = mqSelected;
            brokersSent[times] = mq.getBrokerName();
            try {
                beginTimestampPrev = System.currentTimeMillis();
                if (times > 0) {
                    //Reset topic with namespace during resend.
                    msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                }
                long costTime = beginTimestampPrev - beginTimestampFirst;
                if (timeout < costTime) {
                    callTimeout = true;
                    break;
                }
                //发送消息
                sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                ...
            } catch (RemotingException e) {
                ...
                continue;
            } catch (MQClientException e) {
                ...
                continue;
            } catch (MQBrokerException e) {
                ...
                continue;
            } catch (InterruptedException e) {
                //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试
                throw e;
            }
        } else {
            break;
        }
    }

    Le code de nouvelle tentative est comme ci-dessus. Dans cette méthode sendDefaultImpl, il essaiera d'envoyer le message trois fois. S'il échoue, l'erreur correspondante sera générée. sendDefaultImpl方法中,会尝试发送三次消息,若是都失败,才会抛出对应的错误。

    2.1.2 客户端容错

    若是有多个Broker候车大厅的时候,服务人员会安排消息孩子选择一个相对不拥挤比较容易进入的来进入。当然那些已经关闭的停电的没有服务能力的,我们是不会进的。

    MQ Client会维护一个Broker的发送延迟信息,根据这个信息会选择一个相对延迟较低的Broker来发送消息。会主动剔除哪些已经宕机,不可用或发送延迟级别较高的Broker.

    选择Broker就是在选择message queue

    2.1.2 Tolérance aux pannes du client

    S'il y a plusieurs salles d'attente des courtiers, le personnel de service s'arrangera pour envoyer un message à l'enfant afin qu'il en choisisse une relativement moins fréquentée et plus facile d'accès. Bien entendu, nous n'entrerons pas dans ceux qui ont été fermés

    ,

    hors tension, n'ont aucune capacité de service.

    Le client MQ conservera les informations sur le délai d'envoi d'un courtier et, sur la base de ces informations, il sélectionnera un courtier avec un délai relativement faible pour envoyer le message. Éliminera activement les courtiers qui sont en panne, indisponibles ou qui ont un niveau de retard d'envoi élevé.

    Sélectionner Courtier revient à sélectionner file d'attente des messages. Le code correspondant est le suivant :

    . Ici, nous allons d'abord déterminer si le

    commutateur de tolérance de délai

    est activé. Ce commutateur

    est désactivé par défaut S'il est activé, le courtier avec une latence inférieure sera prioritaire.

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        //判断发送延迟容错开关是否开启
        if (this.sendLatencyFaultEnable) {
            try {
                //选择一个延迟上可以接受,并且和上次发送相同的Broker
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    //若是Broker的延迟时间可以接受,则返回这个Broker
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            //若是前边都没选中一个Broker,就随机选一个Broker
            return tpInfo.selectOneMessageQueue();
        }
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
    Mais lorsque le interrupteur de tolérance de retard est dans l'état off

    , le code exécuté est le suivant : 🎜🎜Afin de répartir uniformément la pression sur le courtier, un autre courtier🎜 sera sélectionné 🎜. 🎜
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        //若是没有上次的Brokername做参考,就随机选一个
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            //如果有,那么就选一个其他的Broker
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                //这里判断遇上一个使用的Broker不是同一个
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            //若是上边的都没选中,那么就随机选一个
            return selectOneMessageQueue();
        }
    }
    🎜2.2 Le côté courtier assure une haute disponibilité aux producteurs 🎜🎜Salle d'attente des courtiers Afin de 🎜recevoir avec précision🎜les messages des enfants, il y aura au moins deux salles, une 🎜salle principale🎜et une 🎜salle adjointe🎜 De manière générale, les enfants. Tout le monde 🎜entre dans le hall principal🎜, puis effectue une seule opération pour informer la machine (technique du clone fantôme), puis laisse le clone entrer dans le hall adjoint de cette façon, lorsque le hall principal est en panne de courant et cesse de fonctionner, le clone dans la salle des adjoints n'a qu'à le faire. Une fois la tâche terminée, tout ira bien. D'une manière générale, c'est le message du hall principal que l'enfant va prendre le bateau pour accomplir la tâche. 🎜

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Déclaration:
    Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer