聊聊rocketmq的MQFaultStrategy
- 2019 年 12 月 12 日
- 筆記
本文主要研究一下rocketmq的MQFaultStrategy
MQFaultStrategy
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/latency/MQFaultStrategy.java
public class MQFaultStrategy { private final static InternalLogger log = ClientLogger.getLog(); private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); private boolean sendLatencyFaultEnable = false; private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; public long[] getNotAvailableDuration() { return notAvailableDuration; } public void setNotAvailableDuration(final long[] notAvailableDuration) { this.notAvailableDuration = notAvailableDuration; } public long[] getLatencyMax() { return latencyMax; } public void setLatencyMax(final long[] latencyMax) { this.latencyMax = latencyMax; } public boolean isSendLatencyFaultEnable() { return sendLatencyFaultEnable; } public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { this.sendLatencyFaultEnable = sendLatencyFaultEnable; } public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); } public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; } }
- MQFaultStrategy定義了latencyFaultTolerance、sendLatencyFaultEnable、latencyMax、notAvailableDuration屬性;其selectOneMessageQueue方法在sendLatencyFaultEnable為false的時候使用的是tpInfo.selectOneMessageQueue(lastBrokerName);在sendLatencyFaultEnable為true時,先通過tpInfo.getSendWhichQueue().getAndIncrement()獲取index,之後遍歷tpInfo.getMessageQueueList(),計算pos(
Math.abs(index++) % tpInfo.getMessageQueueList().size()
),若小於0則重置為0;然後使用latencyFaultTolerance.isAvailable來判斷是否可用,若可用且null == lastBrokerName或者mq.getBrokerName().equals(lastBrokerName)則返回該MessageQueue - selectOneMessageQueue方法在sendLatencyFaultEnable為true時,若遍歷tpInfo.getMessageQueueList()都不可用,則通過latencyFaultTolerance.pickOneAtLeast()方法來選擇notBestBroker,若其writeQueueNums大於0,則通過tpInfo.selectOneMessageQueue()選擇MessageQueue,設置其brokerName為notBestBroker,設置其queueId為(
tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums
);若writeQueueNums小於等於0則執行latencyFaultTolerance.remove(notBestBroker);如果前面沒有選出MessageQueue則最後使用tpInfo.selectOneMessageQueue() - 其updateFaultItem方法在sendLatencyFaultEnable為true時會使用computeNotAvailableDuration計算duration,然後通過latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration)來更新;computeNotAvailableDuration方法則從後開始遍歷latencyMax,在找到
currentLatency >= latencyMax[i]
時返回notAvailableDuration[i],否則最後返回0
DefaultMQProducerImpl
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public class DefaultMQProducerImpl implements MQProducerInner { private final InternalLogger log = ClientLogger.getLog(); private final Random random = new Random(); private final DefaultMQProducer defaultMQProducer; private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>(); private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); private final RPCHook rpcHook; private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue; private final ExecutorService defaultAsyncSenderExecutor; private final Timer timer = new Timer("RequestHouseKeepingService", true); protected BlockingQueue<Runnable> checkRequestQueue; protected ExecutorService checkExecutor; private ServiceState serviceState = ServiceState.CREATE_JUST; private MQClientInstance mQClientFactory; private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>(); private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); private ExecutorService asyncSenderExecutor; //...... public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); } public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); } //...... }
- DefaultMQProducerImpl的selectOneMessageQueue、updateFaultItem方法均委託給mqFaultStrategy來執行
小結
- MQFaultStrategy定義了latencyFaultTolerance、sendLatencyFaultEnable、latencyMax、notAvailableDuration屬性;其selectOneMessageQueue方法在sendLatencyFaultEnable為false的時候使用的是tpInfo.selectOneMessageQueue(lastBrokerName);在sendLatencyFaultEnable為true時,先通過tpInfo.getSendWhichQueue().getAndIncrement()獲取index,之後遍歷tpInfo.getMessageQueueList(),計算pos(
Math.abs(index++) % tpInfo.getMessageQueueList().size()
),若小於0則重置為0;然後使用latencyFaultTolerance.isAvailable來判斷是否可用,若可用且null == lastBrokerName或者mq.getBrokerName().equals(lastBrokerName)則返回該MessageQueue - selectOneMessageQueue方法在sendLatencyFaultEnable為true時,若遍歷tpInfo.getMessageQueueList()都不可用,則通過latencyFaultTolerance.pickOneAtLeast()方法來選擇notBestBroker,若其writeQueueNums大於0,則通過tpInfo.selectOneMessageQueue()選擇MessageQueue,設置其brokerName為notBestBroker,設置其queueId為(
tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums
);若writeQueueNums小於等於0則執行latencyFaultTolerance.remove(notBestBroker);如果前面沒有選出MessageQueue則最後使用tpInfo.selectOneMessageQueue() - 其updateFaultItem方法在sendLatencyFaultEnable為true時會使用computeNotAvailableDuration計算duration,然後通過latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration)來更新;computeNotAvailableDuration方法則從後開始遍歷latencyMax,在找到
currentLatency >= latencyMax[i]
時返回notAvailableDuration[i],否則最後返回0
doc
- MQFaultStrategy