如何处理重复消息
约 1016 字大约 3 分钟
2025-04-10
➡️ 如何处理重复消息
重复消息是如何出现的?
- 生产者将消息放入
Broker
等待Broker
的响应时,由于网络原因生产者没有收到,然后生产者又重发了一次,此时消息就重复了。 - 假设消费者拿到消息消费,业务逻辑走完了,事物提交了,此时要更新
Consumer offest
,然后消费者挂了,另一个消费者顶上,此时Consumer offest
没有更新,于是又拿一遍刚才的消息,业务又执行一遍。
- 生产者将消息放入
如何解决重复消费消息?幂等。
- 通过version即版本号控制,对比消息中的版本号和数据库中的版本号
- 记录关键的key。如订单ID等。
什么是幂等?
理解为同样的参数多次调用同一个接口和调用一次产生的结果是一致的。
➡️ 如何处理消息堆积
原因
生产者的生产速度与消费者的速度不匹配。
方案
先定位消费慢的原因,如果是bug则处理bug;如果是本身消费能力较弱,优化小消费逻辑,比如之前是一条一条消费处理的,优化为批量处理;水平扩容,增加
Topic
队列数和消费者数。注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一个消费者。你是将接受到的消息写入内存队列之后,然后就返回响应给
Broker
,然后多线程向内存队列消费消息,假设此时消费者宕机之后内存队列里还未消费的消息也就丢了。
➡️ 如何写个消息中间件
首先需要明确地提出 消息中间件的几个重要角色,分别是生产者、消费者、Broker、注册中心。
简述下消息中间件数据流转过程,无非就是 生产者生产消息,发送至Broker,Broker可以暂缓消息,然后消费者再从Broker获取消息,用于消费。
而注册中心用于服务的发现包括:Broker的发现、生产者的发现、消费者的发现,当然还包括下线,可以说服务的高可用离不开注册中心。
然后开始讲述实现要点: 各个模块的通信基于Netty然后自定义协议来实现 ,注册中心可以利用Zookeeper、consul、eureka、nacos等等,也可以像RockerMQ自己实现简单的namesrv
为了考虑扩容和整体的性能,采用分布式思想,就像kafka一样采取分区理念, 一个Topic分为多个partition,并且为了保证数据可靠性,采取多副本存储,即Leader和follower ,根据性能和数据可靠的权衡提供异步和同步的输盘存储
还可以利用 选举算法保证Leader挂了之后follower可以顶上 ,保证消息队列的高可用
为了提供消息的可靠性, 利用本地文件系统来存储消息 ,并且采用顺序写的方式来提高性能
根据消息队列的特性 利用内存映射、零拷贝进一步提升性能 ,还可以利用像kafka这种批处理思想提高整体的吞吐量。
衍生问题
- Netty是什么?各注册中心之间的选型对比?
2.选举算法,Bully 算法、Raft 算法、ZAB 算法等等?
3.分区,这个分区和 RocketMQ 的队列有什么不同?具体分区要怎么实现?
4.顺序写,为什么要顺序写啊?内存映射和零拷贝又是什么啊?RocketMQ 和 Kafka 用了哪个吗?
5.消息的写入如何存储、消息的索引如何生成等等?有没有看过消息中间件的源码?