hi,你好!欢迎访问本站!登录
本站由网站地图腾讯云宝塔系统阿里云强势驱动
当前位置:首页 - 教程 - 杂谈 - 正文 君子好学,自强不息!

关于RocketMQ音讯消耗与重均衡的一些问题讨论

2019-11-18杂谈搜奇网53°c
A+ A-

实在最好的进修体式格局就是相互交换,近来也有跟网友议论了一些关于 RocketMQ 音讯拉取与重均衡的题目,我权且在这里写下我的一些总结。

关于 push 形式下的音讯轮回拉取题目

之前宣布了一篇关于重均衡的文章:「Kafka 重均衡机制」,内里有说到 RocketMQ 重均衡机制是每隔 20s 从恣意一个 Broker 节点猎取消耗组的消耗 ID 以及定阅信息,再依据这些定阅信息举行分派,然后将分派到的信息封装成 pullRequest 对象 pull 到 pullRequestQueue 行列中,拉取线程叫醒后实行拉取使命,流程图以下:

然则个中有一些是没有细致说的,比方每次拉音讯都要等 20s 吗?真的有个网友问了我以下题目:

很显然他的项目是用了 push 形式举行音讯拉取,要回覆这个题目,就要从 RockeMQ 的音讯拉取提及:

RocketMQ 的 push 形式的完成是基于 pull 形式,只不过在 pull 形式上套了一层,所以RocketMQ push 形式并非真正意义上的 ”推形式“,因而,在 push 形式下,消耗者拉取完音讯后,立马就有最先下一个拉取使命,并不会真的等 20s 重均衡后才拉取,至于 push 形式是怎样完成的,那就从源码去找答案。

之前有写过一篇文章:「RocketMQ为何要保证定阅关联的一致性?」,内里有说过 音讯拉取是从 PullRequestQueue 壅塞行列中掏出 PullRequest 拉取使命举行音讯拉取的,但 PullRequest 是怎样放进 PullRequestQueue 壅塞行列中的呢?

RocketMQ 一共供应了以下要领:

org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately:

public void executePullRequestImmediately(final PullRequest pullRequest) {
  try {
    this.pullRequestQueue.put(pullRequest);
  } catch (InterruptedException e) {
    log.error("executePullRequestImmediately pullRequestQueue.put", e);
  }
}

从挪用链发明,除了重均衡会挪用该要领以外,在 push 形式下,PullCallback 回调对象中的 onSuccess 要领在音讯消耗时,也挪用了该要领:

org.apache.rocketmq.client.consumer.PullCallback#onSuccess:

case FOUND:

// 假如本次拉取音讯为空,则继续将pullRequest放入壅塞行列中
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
  // 将音讯放入消耗者消耗线程去实行
  DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
    pullResult.getMsgFoundList(), //
    processQueue, //
    pullRequest.getMessageQueue(), //
    dispathToConsume);
  // 将pullRequest放入壅塞行列中
  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);  
}

当从 broker 拉取到音讯后,假如音讯被过滤掉,则继续将pullRequest放入壅塞行列中继续轮回实行音讯拉取使命,否则将音讯放入消耗者消耗线程去实行,在pullRequest放入壅塞行列中。

case NO_NEW_MESSAGE:

case NO_MATCHED_MSG:

pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

假如从 broker 端没有可拉取的新音讯或许没有匹配到音讯,则将pullRequest放入壅塞行列中继续轮回实行音讯拉取使命。

从以上音讯消耗逻辑能够看出,当音讯处置惩罚完后,立行将 pullRequest 从新放入壅塞行列中,因而这就很好诠释为何 push 形式能够延续拉取音讯了:

在 push 形式下音讯消耗完后,还会挪用该要领从新将 PullRequest 对象放进 PullRequestQueue 壅塞行列中,不断地从 broker 中拉取音讯,完成 push 结果。

重均衡后行列被别的消耗者分派后怎样处置惩罚?

继续再想一个题目,假如重均衡后,发明某个行列被新的消耗者分派了,怎样办,总不能继续从该行列中拉取音讯吧?

RocketMQ 重均衡后会搜检 pullRequest 是不是还在新分派的列表中,假如不在,则抛弃,挪用 isDrop() 可查出该pullRequest是不是已抛弃:

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage:

final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
  log.info("the pull request[{}] is dropped.", pullRequest.toString());
  return;
}

在音讯拉取之前,起首推断该行列是不是被抛弃,假如已抛弃,则直接摒弃本次拉取使命。

那什么时候行列被抛弃呢?

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:

Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
  Entry<MessageQueue, ProcessQueue> next = it.next();
  MessageQueue mq = next.getKey();
  ProcessQueue pq = next.getValue();

  if (mq.getTopic().equals(topic)) {
    // 推断当前缓存 MessageQueue 是不是包括在最新的 mqSet 中,假如不存在则将行列抛弃
    if (!mqSet.contains(mq)) {
      pq.setDropped(true);
      if (this.removeUnnecessaryMessageQueue(mq, pq)) {
        it.remove();
        changed = true;
        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
      }
    } else if (pq.isPullExpired()) {
      // 假如行列拉取逾期则抛弃
      switch (this.consumeType()) {
        case CONSUME_ACTIVELY:
          break;
        case CONSUME_PASSIVELY:
          pq.setDropped(true);
          if (this.removeUnnecessaryMessageQueue(mq, pq)) {
            it.remove();
            changed = true;
            log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                      consumerGroup, mq);
          }
          break;
        default:
          break;
      }
    }
  }
}

updateProcessQueueTableInRebalance 要领在重均衡时实行,用于更新 processQueueTable,它是当前消耗者的行列缓存列表,以上要领逻辑推断当前缓存 MessageQueue 是不是包括在最新的 mqSet 中,假如不包括个中,则申明经由此次重均衡后,该行列被分派给别的消耗者了,或许拉取时候距离太大逾期了,则挪用 setDropped(true) 要领将行列置为抛弃状况。

能够你会问,processQueueTable 跟 pullRequest 内里 processQueue 有什么关联,往下看:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:

// 新建 ProcessQueue 
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
  // 将ProcessQueue放入processQueueTable中
  ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
  if (pre != null) {
    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
  } else {
    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
    PullRequest pullRequest = new PullRequest();
    pullRequest.setConsumerGroup(consumerGroup);
    pullRequest.setNextOffset(nextOffset);
    pullRequest.setMessageQueue(mq);
    // 将ProcessQueue放入pullRequest拉取使命对象中
    pullRequest.setProcessQueue(pq);
    pullRequestList.add(pullRequest);
    changed = true;
  }
}

能够看出,重均衡时会建立 ProcessQueue 对象,将其放入 processQueueTable 缓存行列表中,再将其放入 pullRequest 拉取使命对象中,也就是 processQueueTable 中的 ProcessQueue 与 pullRequest 的中 ProcessQueue 是同一个对象。

重均衡后会致使音讯反复消耗吗?

之前在群里有个网友提了这个题目:

我当时回覆他 RocketMQ 一般也是没有反复消耗,但厥后发明实在 RocketMQ 在某些状况下,也是会涌现音讯反复消耗的征象。

前面讲到,RocketMQ 音讯消耗时,会将音讯放进消耗线程中去实行,代码以下:

org.apache.rocketmq.client.consumer.PullCallback#onSuccess:

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
  pullResult.getMsgFoundList(), //
  processQueue, //
  pullRequest.getMessageQueue(), //
  dispathToConsume);

ConsumeMessageService 类完成音讯消耗的逻辑,它有两个完成类:

// 并发音讯消耗逻辑完成类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// 递次音讯消耗逻辑完成类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;

先看并发音讯消耗相干处置惩罚逻辑:

ConsumeMessageConcurrentlyService:

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run:

if (this.processQueue.isDropped()) {
  log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
  return;
}

// 音讯消耗逻辑
// ...

// 假如行列被设置为抛弃状况,则不提交音讯消耗进度
if (!processQueue.isDropped()) {
    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
    log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}

ConsumeRequest 是一个继续了 Runnable 的类,它是音讯消耗中心逻辑的完成类,submitConsumeRequest 要领将 ConsumeRequest 放入 消耗线程池中实行音讯消耗,从它的 run 要领中可看出,假如在实行音讯消耗逻辑中有节点到场,重均衡后该行列被分派给别的节点举行消耗了,此时的行列被抛弃,则不提交音讯消耗进度,由于之前已消耗了,此时就会形成音讯反复消耗的状况。

再来看看递次消耗相干处置惩罚逻辑:

ConsumeMessageOrderlyService:

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run:

public void run() {
  // 推断行列是不是被抛弃
  if (this.processQueue.isDropped()) {
    log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
    return;
  }

  final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
  synchronized (objLock) {
    // 假如不是播送形式,且行列已加锁且锁没有逾期
    if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
      final long beginTime = System.currentTimeMillis();
      for (boolean continueConsume = true; continueConsume; ) {
        // 再次推断行列是不是被抛弃
        if (this.processQueue.isDropped()) {
          log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
          break;
        }
        
        // 音讯消耗处置惩罚逻辑
        // ...
        
          continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
        } else {
          continueConsume = false;
        }
      }
    } else {
      if (this.processQueue.isDropped()) {
        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
        return;
      }
      ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
    }
  }
}

RocketMQ 递次音讯消耗会将行列锁定,当行列猎取锁以后才举行消耗,所以,纵然音讯在消耗历程中有节点到场,重均衡后该行列被分派给别的节点举行消耗了,此时的行列被抛弃,依旧不会形成反复消耗。

更多精彩文章请关注作者保护的民众号「后端进阶」,这是一个专注后端相干手艺的民众号。
关注民众号并复兴「后端」免费领取后端相干电子书籍。
迎接分享,转载请保存出处。

  选择打赏方式
微信赞助

打赏

QQ钱包

打赏

支付宝赞助

打赏

  移步手机端
关于RocketMQ音讯消耗与重均衡的一些问题讨论

1、打开你手机的二维码扫描APP
2、扫描左则的二维码
3、点击扫描获得的网址
4、可以在手机端阅读此文章
未定义标签

本文来源:搜奇网

本文地址:https://www.sou7.cn/282415.html

关注我们:微信搜索“搜奇网”添加我为好友

版权声明: 本文仅代表作者个人观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。请记住本站网址https://www.sou7.cn/搜奇网。

发表评论

选填

必填

必填

选填

请拖动滑块解锁
>>