博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
源码分析RocketMQ同一个消费组设置不同tag,消息订阅失败问题
阅读量:7190 次
发布时间:2019-06-29

本文共 1566 字,大约阅读时间需要 5 分钟。

背景介绍

项目组使用阿里RocketMQ,对同一个消费组设置不同的tag订阅关系,出现消息丢失的问题,本文从rocketmq源码研究消息发布与订阅原理,并分析导致该问题的原因。

官方说明

  • 告诉使用者:同一个消费组,必须保持订阅关系一致
  • 为什么?它没有说!只能从源码找答案

问题复现

  1. 启动消费者1,消费组为group1,订阅topicA的消息,tag设置为tag1 || tag2
  2. 启动消费者2,消费组也为group1,也订阅topicA的消息,但是tag设置为tag3
  3. 启动生产者,生产者发送含有tag1,tag2,tag3的消息各10条
  4. 消费者1没有收到任何消息,消费者2收到部分消息

先上结论

  • 同一个消费组中,设置不同tag时,后启动的消费者会覆盖先启动的消费者设置的tag
  • tag决定了消息过滤的条件,经过服务端和客户端两层过滤,最后只有后启动的消费者才能收到部分消息

原理说明

消息如何保存

CommitLog

  • 保存所有topic的原始消息
  • CommitLog分为多个文件,每个文件默认最大为1G
  • 每条记录包括:消息长度和消息文本(消息体,属性,uid等等)
  • 因每条消息长度不一致,每个commitLog的记录长度也不一致

ConsumerQueue

  • 保存某个Topic下某个Queue索引信息
  • 每条记录包括:消息在commitLog中的offset,消息大小,消息tag的哈希值
  • 每条记录长度固定为20byte
  • producer发送消息后,先保存到commitLog,再异步建立该条消息对应的topic + queue对应的ConsumerQueue索引
  • 第三部分的Hash(tag)是服务端过滤消息的重要依据

consumer如何订阅消息

注册订阅信息

  • consumer订阅时,会将订阅信息注册到到服务端
  • 保存订阅信息的是Map类,key为topic,value主要是tag
  • subVersion取当前时间

    这里的key是topic,subVersion版本号,这两点很关键!后面有用到!

拉取消息并过滤

  • 拉取消息时,首先从服务端获取订阅关系,得到tag的hash集合codeSet
  • 然后从ConsumerQueue获取一条记录,判断记录的hashCode是否在codeSet中,以达到消息过滤的目的,决定是否将该消息发送给consumer
  • 总之一句话:tag决定了消息是否发到客户端

消息过滤

服务端过滤

  • 过滤:tag的hash值过滤
  • 优点:
    • 减少不必要消息占用流量
  • 缺点:
    • Hash存在冲突,过滤不完全准确

客户端过滤

  • 服务端过滤存在不准确性,客户端再次精确过滤
  • 客户度过滤:tag的字符串值做对比。不相等的不返回给消费者

原因总结

  • 同一个consumer group的订阅关系,保存在RebalanceImpl类的Map中。key为topic
  • 不同的消费者启动后,依次注册订阅关系,因为tag不一样,导致Map中同一topic的tag被覆盖。比如:消费者1订阅tag1,消费者2订阅tag2。最后map中只保存tag2.
  • 过滤的核心是是tag,tag被更新,过滤条件被改变。服务端过滤后只返回tag2的消息
  • 客户端接收消息后,再次过滤。先启动的消费者1订阅tagA,但是服务端返回tag2,所以消费者1收不到任何消息。消费者2能收到一半的消息(集群模式,假设消息平均分配,另外一半分给tag2)

源码分析

订阅关系数据结构

消费者1启动时注册的订阅关系

消费者2后启动覆盖订阅关系

服务端过滤时取出ConsumerQueue的Hash(tag)

对比消息的Hash(tag)和之前保存的订阅关系

客户端过滤

转载于:https://juejin.im/post/5cfa7065f265da1ba647dddd

你可能感兴趣的文章
12.3、bash脚本循环语句
查看>>
用C语言实现“智障”的棋盘游戏
查看>>
JavaScript的对象——灵活与危险
查看>>
如何看待sds?
查看>>
隔行换色
查看>>
-- 小白python2之函数
查看>>
如何不用那么担心成为一个坏程序员
查看>>
基于SSM的驾校预约报名管理系统-java驾校预约报名管理系统
查看>>
#8 bash的颜色显示规则
查看>>
用python写的判断质数和登录程序升级版
查看>>
18.6 负载均衡集群介绍 18.7 LVS介绍 18.8 LVS调度算法 18.9/18.10 L
查看>>
Apache安装部署
查看>>
CCNA网络技术实验手册:Cisco IOS备份与升级
查看>>
相关VB.NET文件对象基础知识讲解
查看>>
简单描述Servlet Filter(过滤器) 相关知识
查看>>
生成自增的编号,生成订单号
查看>>
SqlSever2005 一千万条以上记录分页数据库优化经验总结【索引优化 + 代码优化】一周搞定...
查看>>
企业内部IT一体化系列之四:WEB平台 SharePoint服务配置
查看>>
ksh里三个月之外的文件移动脚本
查看>>
MSDN Windows8 中文版 下载地址
查看>>