public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L; //主题名字 private String topic; //消息扩展信息,Tag,keys,延迟级别都存在这里 private Map<String, String> properties; //消息体,字节数组 private byte[] body; //设置消息的key, public void setKeys(String keys) {} //设置topic public void setTopic(String topic) {} //延迟级别 public int setDelayTimeLevel(int level) {} //消息过滤的标记 public void setTags(String tags) {} //扩展信息存放在此 public void putUserProperty(final String name, final String value) {} }
Mesejnya ialah kanak-kanak ini mempunyai ciri-ciri tersendiri tetapi juga mempunyai persamaan. Dua anak yang dihantar oleh ibu bapa yang sama boleh pergi ke tempat yang sama, atau mereka boleh pergi ke tempat yang berbeza.
Pertama sekali, setiap mesej kanak-kanak mempunyai atribut topik, yang kami nyatakan di atas, ialah dewan menunggu. Selepas kanak-kanak masuk, mereka berjalan ke kawasan yang ditetapkan dewan menunggu mereka yang ditetapkan (bukankah mereka biasanya menaiki kereta api dan kereta api berkelajuan tinggi di platform yang ditetapkan apabila mereka keluar), dan duduk di barisan mesej tempat duduk Tunggu, tunggu perjalanan.
Broker mempunyai satu atau lebih topik, dan mesej akan disimpan dalam baris gilir mesej dalam topik, menunggu untuk digunakan.
Berita kanak-kanak, ada juga Atribut badan, ini dia kemampuan, dia boleh melukis, dia boleh menyanyi, dia. Apa sahaja yang anda lakukan direkodkan dalam atribut Body. Apabila anda keluar, tempat di mana nilai ditunjukkan juga ialah atribut Body.
Body ialah badan mesej dan pengguna akan melakukan operasi yang sepadan berdasarkan badan mesej.
Seperti yang kami nyatakan dalam bahagian sebelumnya, teg ini hanyalah tanda Sesetengah kanak-kanak membawa papan lukisan dan kamera di belakang mereka, dan beberapa kapal pesiar hanya temui khasnya Kanak-kanak ini menarik diri dan menyelesaikan misi mereka.
Anda boleh menetapkan atribut teg untuk mesej dan pengguna boleh memilih mesej yang mengandungi atribut teg khusus untuk digunakan.
kunci ialah nama bagi setiap mesej kanak-kanak. Kalau nak cari anak, panggil saja namanya.
Tetapkan Kunci untuk mesej yang dihantar dan anda boleh mencari mesej berdasarkan Kunci ini pada masa hadapan. Sebagai contoh, jika mesej tidak normal atau mesej hilang, ia akan menjadi sangat mudah untuk mencari.
Sudah tentu, sesetengah kanak-kanak tidak tergesa-gesa untuk pergi apabila mereka datang. Mereka akan mengambil masa 30 minit untuk makan mereka akan menunggu apabila mereka datang selepas 30 minit.
Menetapkan tahap kelewatan boleh menentukan tempoh masa yang diambil untuk mesej digunakan.
Setiap ibu bapa yang menghantar anak mereka berharap untuk menghantar mereka ke balai menunggu, dan tidak mahu anak mereka hilang, this At this masa, dewan menunggu memerlukan beberapa mekanisme jaminan.
Iaitu, selepas ibu bapa menghantar anak ke dewan menunggu, Jika anda gagal untuk duduk di kerusi giliran mesej , kakitangan akan mengatur untuk mencuba lagi untuk melihat jika terdapat tempat duduk. Bilangan lalai percubaan semula ialah 2 kali, yang bermaksud bahawa kanak-kanak mesej mempunyai sejumlah 3 peluang untuk mencari tempat duduk.
Melihat pada kod sumber, saya menambah komen khas supaya ia boleh difahami secara kasar.
//这里取到了重试的次数 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //获取消息队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } //发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); ... } catch (RemotingException e) { ... continue; } catch (MQClientException e) { ... continue; } catch (MQBrokerException e) { ... continue; } catch (InterruptedException e) { //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试 throw e; } } else { break; } }
Kod cuba semula adalah seperti di atas dalam kaedah sendDefaultImpl
ini, ia akan cuba menghantar mesej tiga kali Jika gagal, ralat yang sepadan akan dilemparkan.
Jika terdapat beberapa ruang menunggu Broker, kakitangan perkhidmatan akan mengaturkan pesanan kepada kanak-kanak untuk memilih satu agak kurang sesak, bandingkan Mudah untuk masuk untuk masuk. Sudah tentu, kami tidak akan memasukkan mereka yang tertutup, kehabisan kuasa, atau tidak mempunyai keupayaan perkhidmatan.
MQ Client akan mengekalkan maklumat kelewatan penghantaran Broker, dan berdasarkan maklumat ini akan memilih Broker dengan kelewatan yang agak rendah untuk menghantar mesej. Broker yang turun, tidak tersedia atau mempunyai tahap kelewatan penghantaran yang tinggi akan dihapuskan secara aktif
Memilih Broker
sedang memilih message queue
Kod yang sepadan adalah seperti berikut:
Ini akan berlaku yang pertama Tentukan sama ada suis toleransi kelewatan dihidupkan ini dimatikan secara lalai Jika ia dihidupkan, Broker dengan pendaman lebih rendah akan diberi keutamaan.
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //判断发送延迟容错开关是否开启 if (this.sendLatencyFaultEnable) { try { //选择一个延迟上可以接受,并且和上次发送相同的Broker int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //若是Broker的延迟时间可以接受,则返回这个Broker if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } //若是前边都没选中一个Broker,就随机选一个Broker return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }Tetapi apabila
suis toleransi kesalahan kelewatan berada dalam keadaan mati, kod yang dilaksanakan adalah seperti berikut:
Untuk sama rata mengagihkan tekanan pada Broker, Brokeryang berbeza daripada yang sebelumnya akan dipilih.
2.2 Bahagian broker memastikan ketersediaan tinggi pengeluarpublic MessageQueue selectOneMessageQueue(final String lastBrokerName) { //若是没有上次的Brokername做参考,就随机选一个 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //如果有,那么就选一个其他的Broker for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //这里判断遇上一个使用的Broker不是同一个 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } //若是上边的都没选中,那么就随机选一个 return selectOneMessageQueue(); } }
dengan tepat kepada kanak-kanak, akan ada sekurang-kurangnya dua dewan, satu
Atas ialah kandungan terperinci Analisis contoh ketersediaan tinggi pengeluar RocketMQ pembangunan Java. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!