RocketMQ生產(chǎn)者為什么需要負(fù)載均衡?
在RocketMQ中,隊(duì)列是消息發(fā)送的基本單位。每個(gè)Topic下可能存在多個(gè)隊(duì)列,因此一個(gè)生產(chǎn)者實(shí)例可以向不同的隊(duì)列發(fā)送消息。當(dāng)生產(chǎn)者發(fā)送消息時(shí),如果不能均衡的將消息發(fā)送到不同的隊(duì)列,那么會(huì)導(dǎo)致隊(duì)列里的消息分布不均衡,這樣最終會(huì)導(dǎo)致消息性能下降,因此生產(chǎn)者負(fù)載均衡機(jī)制也是非常重要的。
RocketMQ生產(chǎn)者原理分析
既然生產(chǎn)者負(fù)載均衡如此重要,我們看下是如何實(shí)現(xiàn)的。
我們通常使用如下方法發(fā)送消息:
構(gòu)建消息 Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); //發(fā)送消息 SendResult sendResult = producer.send(msg);
RocketMQ發(fā)送消息的核心邏輯在DefaultMQProducerImpl類(lèi)sendDefaultImpl。
在發(fā)送消息流程利里面有一行非常關(guān)鍵的邏輯,selectOneMessageQueue,看方法名稱(chēng)就可以知道其含義,選擇一個(gè)消息隊(duì)列。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); }
里面是通過(guò)策略類(lèi)來(lái)實(shí)現(xiàn)的。
策略類(lèi)最終通過(guò)org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String) 實(shí)現(xiàn)。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //生產(chǎn)者第一次發(fā)消息 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //非第一次,重試發(fā)消息的情況, for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //重試的情況,不取上一個(gè)broker的隊(duì)列 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } } 第一次發(fā)消息選擇隊(duì)列核心邏輯在 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue() //線(xiàn)程安全的index private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); public MessageQueue selectOneMessageQueue() { //獲取一個(gè)基礎(chǔ)索引,每次自增1 這個(gè)全局存在TopicPublishInfo 每一個(gè)topic int index = this.sendWhichQueue.getAndIncrement(); // 基礎(chǔ)索引和 消息寫(xiě)隊(duì)列大小 進(jìn)行取模 用來(lái)實(shí)現(xiàn)輪訓(xùn)的算法 int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
哈哈,這里就是生產(chǎn)者負(fù)載均衡輪詢(xún)機(jī)制的核心邏輯了,使用到了ThreadLocal技術(shù),sendWhichQueue為每個(gè)生產(chǎn)者線(xiàn)程維護(hù)一個(gè)自己的下標(biāo)索引。
基礎(chǔ)索引計(jì)算器,使用ThreadLocal技術(shù)針對(duì)不同的生產(chǎn)者線(xiàn)程第一次隨機(jī),后面遞增,可以更加負(fù)載均衡。
public class ThreadLocalIndex { //關(guān)鍵技術(shù) private final ThreadLocalthreadLocalIndex = new ThreadLocal (); private final Random random = new Random(); public int getAndIncrement() { Integer index = this.threadLocalIndex.get(); if (null == index) { //第一次隨機(jī) index = Math.abs(random.nextInt()); if (index < 0) index = 0; this.threadLocalIndex.set(index); } //第二次索引位置開(kāi)始自增1 index = Math.abs(index + 1); if (index < 0) index = 0; this.threadLocalIndex.set(index); return index; } }
哈哈,有沒(méi)有覺(jué)得這個(gè)實(shí)現(xiàn)非常巧妙了。不同的生產(chǎn)者線(xiàn)程都擁有自己的索引因子,分配隊(duì)列更加均衡。
總結(jié)
本文分析了RocketMQ生產(chǎn)者底層的實(shí)現(xiàn),設(shè)計(jì)地方有巧妙之處,值得我們學(xué)習(xí),上面是發(fā)送非順序消息的場(chǎng)景, 如果是順序消息,我們作為使用者可以指定負(fù)載均衡策略。
編輯:黃飛
-
負(fù)載均衡
+關(guān)注
關(guān)注
0文章
113瀏覽量
12396 -
線(xiàn)程
+關(guān)注
關(guān)注
0文章
507瀏覽量
19763 -
消息隊(duì)列
+關(guān)注
關(guān)注
0文章
33瀏覽量
3018
原文標(biāo)題:RocketMQ生產(chǎn)者負(fù)載均衡(輪詢(xún)機(jī)制)核心原理
文章出處:【微信號(hào):magedu-Linux,微信公眾號(hào):馬哥Linux運(yùn)維】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論