关于RocketMQ音讯消耗与重均衡的一些问题讨论
2019-11-18杂谈搜奇网61°c
A+ A-实在最好的进修体式格局就是相互交换,近来也有跟网友议论了一些关于 RocketMQ 音讯拉取与重均衡的题目,我权且在这里写下我的一些总结。
关于 push 形式下的音讯轮回拉取题目
之前宣布了一篇关于重均衡的文章:「Kafka 重均衡机制」,内里有说到 RocketMQ 重均衡机制是每隔 20s 从恣意一个 Broker 节点猎取消耗组的消耗 ID 以及定阅信息,再依据这些定阅信息举行分派,然后将分派到的信息封装成 pullRequest 对象 pull 到 pullRequestQueue 行列中,拉取线程叫醒后实行拉取使命,流程图以下:
然则个中有一些是没有细致说的,比方每次拉音讯都要等 20s 吗?真的有个网友问了我以下题目:
很显然他的项目是用了 push 形式举行音讯拉取,要回覆这个题目,就要从 RockeMQ 的音讯拉取提及:
RocketMQ 的 push 形式的完成是基于 pull 形式,只不过在 pull 形式上套了一层,所以RocketMQ push 形式并非真正意义上的 ”推形式“,因而,在 push 形式下,消耗者拉取完音讯后,立马就有最先下一个拉取使命,并不会真的等 20s 重均衡后才拉取,至于 push 形式是怎样完成的,那就从源码去找答案。
之前有写过一篇文章:「RocketMQ为何要保证定阅关联的一致性?」,内里有说过 音讯拉取是从 PullRequestQueue 壅塞行列中掏出 PullRequest 拉取使命举行音讯拉取的,但 PullRequest 是怎样放进 PullRequestQueue 壅塞行列中的呢?
RocketMQ 一共供应了以下要领:
org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately:
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
从挪用链发明,除了重均衡会挪用该要领以外,在 push 形式下,PullCallback 回调对象中的 onSuccess 要领在音讯消耗时,也挪用了该要领:
org.apache.rocketmq.client.consumer.PullCallback#onSuccess:
case FOUND:
// 假如本次拉取音讯为空,则继续将pullRequest放入壅塞行列中
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
// 将音讯放入消耗者消耗线程去实行
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
pullResult.getMsgFoundList(), //
processQueue, //
pullRequest.getMessageQueue(), //
dispathToConsume);
// 将pullRequest放入壅塞行列中
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
当从 broker 拉取到音讯后,假如音讯被过滤掉,则继续将pullRequest放入壅塞行列中继续轮回实行音讯拉取使命,否则将音讯放入消耗者消耗线程去实行,在pullRequest放入壅塞行列中。
case NO_NEW_MESSAGE:
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
假如从 broker 端没有可拉取的新音讯或许没有匹配到音讯,则将pullRequest放入壅塞行列中继续轮回实行音讯拉取使命。
从以上音讯消耗逻辑能够看出,当音讯处置惩罚完后,立行将 pullRequest 从新放入壅塞行列中,因而这就很好诠释为何 push 形式能够延续拉取音讯了:
在 push 形式下音讯消耗完后,还会挪用该要领从新将 PullRequest 对象放进 PullRequestQueue 壅塞行列中,不断地从 broker 中拉取音讯,完成 push 结果。
重均衡后行列被别的消耗者分派后怎样处置惩罚?
继续再想一个题目,假如重均衡后,发明某个行列被新的消耗者分派了,怎样办,总不能继续从该行列中拉取音讯吧?
RocketMQ 重均衡后会搜检 pullRequest 是不是还在新分派的列表中,假如不在,则抛弃,挪用 isDrop() 可查出该pullRequest是不是已抛弃:
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage:
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
在音讯拉取之前,起首推断该行列是不是被抛弃,假如已抛弃,则直接摒弃本次拉取使命。
那什么时候行列被抛弃呢?
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
// 推断当前缓存 MessageQueue 是不是包括在最新的 mqSet 中,假如不存在则将行列抛弃
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
// 假如行列拉取逾期则抛弃
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
updateProcessQueueTableInRebalance 要领在重均衡时实行,用于更新 processQueueTable,它是当前消耗者的行列缓存列表,以上要领逻辑推断当前缓存 MessageQueue 是不是包括在最新的 mqSet 中,假如不包括个中,则申明经由此次重均衡后,该行列被分派给别的消耗者了,或许拉取时候距离太大逾期了,则挪用 setDropped(true) 要领将行列置为抛弃状况。
能够你会问,processQueueTable 跟 pullRequest 内里 processQueue 有什么关联,往下看:
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:
// 新建 ProcessQueue
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
// 将ProcessQueue放入processQueueTable中
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
// 将ProcessQueue放入pullRequest拉取使命对象中
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
}
能够看出,重均衡时会建立 ProcessQueue 对象,将其放入 processQueueTable 缓存行列表中,再将其放入 pullRequest 拉取使命对象中,也就是 processQueueTable 中的 ProcessQueue 与 pullRequest 的中 ProcessQueue 是同一个对象。
重均衡后会致使音讯反复消耗吗?
之前在群里有个网友提了这个题目:
我当时回覆他 RocketMQ 一般也是没有反复消耗,但厥后发明实在 RocketMQ 在某些状况下,也是会涌现音讯反复消耗的征象。
前面讲到,RocketMQ 音讯消耗时,会将音讯放进消耗线程中去实行,代码以下:
org.apache.rocketmq.client.consumer.PullCallback#onSuccess:
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
pullResult.getMsgFoundList(), //
processQueue, //
pullRequest.getMessageQueue(), //
dispathToConsume);
ConsumeMessageService 类完成音讯消耗的逻辑,它有两个完成类:
// 并发音讯消耗逻辑完成类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// 递次音讯消耗逻辑完成类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
先看并发音讯消耗相干处置惩罚逻辑:
ConsumeMessageConcurrentlyService:
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run:
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
// 音讯消耗逻辑
// ...
// 假如行列被设置为抛弃状况,则不提交音讯消耗进度
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
ConsumeRequest 是一个继续了 Runnable 的类,它是音讯消耗中心逻辑的完成类,submitConsumeRequest 要领将 ConsumeRequest 放入 消耗线程池中实行音讯消耗,从它的 run 要领中可看出,假如在实行音讯消耗逻辑中有节点到场,重均衡后该行列被分派给别的节点举行消耗了,此时的行列被抛弃,则不提交音讯消耗进度,由于之前已消耗了,此时就会形成音讯反复消耗的状况。
再来看看递次消耗相干处置惩罚逻辑:
ConsumeMessageOrderlyService:
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run:
public void run() {
// 推断行列是不是被抛弃
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
// 假如不是播送形式,且行列已加锁且锁没有逾期
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
// 再次推断行列是不是被抛弃
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
// 音讯消耗处置惩罚逻辑
// ...
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
RocketMQ 递次音讯消耗会将行列锁定,当行列猎取锁以后才举行消耗,所以,纵然音讯在消耗历程中有节点到场,重均衡后该行列被分派给别的节点举行消耗了,此时的行列被抛弃,依旧不会形成反复消耗。
更多精彩文章请关注作者保护的民众号「后端进阶」,这是一个专注后端相干手艺的民众号。
关注民众号并复兴「后端」免费领取后端相干电子书籍。
迎接分享,转载请保存出处。