目录
MQ(Message%20Queue)%E5%9F%BA%E6%9C%AC%E6%A6%82%E5%BF%B5-toc" name="tableOfContents" style="margin-left:0px">MQ(Message Queue)基本概念
为什么要使用消息队列?
使用消息队列有什么缺点?
如何保证消息不丢失?(如何保证消息的可靠性传输?/如何处理消息丢失的问题?)
MQ%E5%9C%BA%E6%99%AF%EF%BC%9A-toc" name="tableOfContents" style="margin-left:40px">通用的MQ场景:
MQ%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1%EF%BC%9F-toc" name="tableOfContents" style="margin-left:40px">RabbitMQ如何保证消息不丢失?
生产者丢数据
解决方案:
消息队列丢数据
解决方案:
消费者丢数据
解决方案:
MQ%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1%EF%BC%9F-toc" name="tableOfContents" style="margin-left:40px">RocketMQ如何保证消息不丢失?
生产者端
同步发送
异步发送+重试机制
事务消息
Broker端
同步刷盘
主从复制
消费者端
手动提交消费位移
幂等性消费
消费者端不要使用异步消费机制(同步处理消息,再提交offset)
总结
Kafka如何保证消息不丢失?(如何保证消息的可靠性传输?)
生产者层面保证消息不丢失
acks 参数设置(控制消息发送确认机制)
重试机制
Kafka集群层面保证消息不丢失(Broker端)
副本机制(多副本机制)
刷盘策略(控制刷盘的频率)
ISR(In - Sync Replicas)机制(合理配置 ISR)
消费者层面保证消息不丢失
手动提交偏移量(手动提交offset)
MQ%E9%9B%86%E7%BE%A4%E6%8C%82%E4%BA%86%E4%B9%8B%E5%90%8E%E6%80%8E%E4%B9%88%E5%8A%9E%EF%BC%9F-toc" name="tableOfContents" style="margin-left:40px">整个MQ集群挂了之后怎么办?
如何保证消息不被重复消费?(如何保证消息消费的幂等性?/如何保证消息队列的幂等性?)
MQ(Message%20Queue)%E5%9F%BA%E6%9C%AC%E6%A6%82%E5%BF%B5" name="MQ(Message%20Queue)%E5%9F%BA%E6%9C%AC%E6%A6%82%E5%BF%B5">MQ(Message Queue)基本概念
MQ全称 Message Queue(消息队列),是在消息传输过程中保存消息的容器。多用于分布式系统之间进行通信。
为什么要使用消息队列?
异步、解耦、削峰(削峰填谷)。
使用消息队列有什么缺点?
系统可用性降低
系统复杂性增加
一致性问题
如何保证消息不丢失?(如何保证消息的可靠性传输?/如何处理消息丢失的问题?)
我们在使用消息队列的过程中,应该做到消息不能多消费,也不能少消费。每种MQ都要从三个角度来分析:生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据。
- 生产者弄丢数据
RabbitMQ:提供 transaction和confirm模式来确保生产者不丢消息;
RocketMQ:同步发送、异步发送+重试、事务消息。
Kafka:acks 参数设置(控制消息发送确认机制,设置参数acks = -1(或 all))、重试机制。
- 消息队列弄丢数据(处理消息队列丢数据的情况,一般是开启持久化磁盘的配置)
消息持久化
- 消费者弄丢数据(消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!)
解决方案:处理消息成功后,手动回复确认消息。
MQ%E5%9C%BA%E6%99%AF%EF%BC%9A" name="%E9%80%9A%E7%94%A8%E7%9A%84MQ%E5%9C%BA%E6%99%AF%EF%BC%9A">通用的MQ场景:
其中,1,2,4三个场景都是跨网络的,而跨网络就肯定会有丢消息的可能。
关于3这个环节,通常MQ存盘时都会先写入操作系统的缓存page cache中,然后再由操作系统异步的将消息写入硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失。这个是MQ场景都会面对的通用的丢消息问题。
MQ%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1%EF%BC%9F" name="RabbitMQ%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1%EF%BC%9F" style="margin-left:0.0001pt; margin-right:0px; text-align:justify">RabbitMQ如何保证消息不丢失?
生产者丢数据
生产者发送消息到RabbitMQ服务器的过程中出现消息丢失。可能是网络波动未收到消息,又或者是服务器宕机。
解决方案:
Confirm消息确认机制(生产者),即采⽤RabbitMQ的发布确认( Publisher Confirms )模式。
Confirm模式是RabbitMQ提供的一种消息可靠性保障机制。当生产者通过Confirm模式发送消息时,它会等待RabbitMQ的确认,确保消息已经被正确地投递到了指定的Exchange中。
消息正确投递到Queue时,会返回ack。
消息没有正确投递到Queue时,会返回nack,你可以进行重试操作。如果Exchange没有绑定Queue,也会出现消息丢失。
使用方法:
- 生产者将Channel设置为Confirm模式(通过调⽤ channel.confirmSelect()完成)。
- 发送消息后,通过添加addConfirmListener方法,监听消息的确认状态。
处理Ack和Nack的代码如下所示:
消息队列丢数据
RabbitMQ服务器消息持久化出现消息丢失。消息发送到RabbitMQ之后,未能及时储存完成持久化,RabbitMQ服务器出现宕机重启,消息出现丢失。
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和Confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,RabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
解决方案:
消息持久化机制(RabbitMQ服务)
持久话机制是将消息存储到磁盘,以保证在RabbitMQ服务器宕机或重启时,消息不会丢失。
使用方法:
- 生产者通过将消息的delivery_mode属性设置为2,将消息标记为持久化。
- 队列也需要进行持久化设置,确保队列在RabbitMQ服务器重启后仍然存在。经典队列需要将durable属性设置为true。而仲裁队列和流式队列默认必须持久化保存。
消费者丢数据
消费者拉取消息过程以及拿到消息后出现消息丢失。消费者从RabbitMQ服务器获取到消息过程出现网络波动等问题可能出现消息丢失;消费者拿到消息后但是消费者未能正常消费,导致丢失,可能是消费者出现处理异常又或者是消费者宕机。
消费者丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息,这时RabbitMQ会立即将消息删除,这种情况下,如果消费者出现异常而未能处理消息,就会丢失该消息。
解决方案:
手动ACK机制(消费者)
ACK机制用于确保消息被正确消费。当消息被消费者成功处理后,消费者发送确认(ACK)给RabbitMQ,告知消息可以被移除。
使用方法:
- 手动开启ACK机制,通过将auto_ack参数设置为false,手动控制消息的ACK。
在RabbitMQ中,ACK机制是默认开启的(自动ACK)。当消息被消费者接受后,会立即从队列中删除,除非消费者发送异常。
MQ%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1%EF%BC%9F" name="RocketMQ%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1%EF%BC%9F" style="margin-left:0.0001pt; margin-right:0px; text-align:justify">RocketMQ如何保证消息不丢失?
RocketMQ通过多层面的机制来确保消息的可靠性,包括生产者端、broker端和消费者端。
生产者端
同步发送
同步发送是最可靠的发送方式,它会等待Broker的确认响应。
同步发送会返回 4 个状态码:
SEND_OK:消息发送成功。需要注意的是,消息发送到 broker 后,还有两个操作:消息刷盘和消息同步到 slave 节点,默认这两个操作都是异步的,只有把这两个操作都改为同步,SEND_OK 这个状态才能真正表示发送成功。
FLUSH_DISK_TIMEOUT:消息发送成功但是消息刷盘超时。
FLUSH_SLAVE_TIMEOUT:消息发送成功但是消息同步到 slave 节点时超时。
SLAVE_NOT_AVAILABLE:消息发送成功但是 broker 的 slave 节点不可用。
根据返回的状态码,可以做消息重试,这里设置的重试次数是 3。
消息重试时,消费端一定要做好幂等处理。
异步发送+重试机制
异步发送通过回调来处理发送结果,并可以设置重试次数。
异步发送,可以重写回调函数,回调函数捕获到 Exception 时表示发送失败,这时可以进行重试,这里设置的重试次数是 3。
事务消息
生产者使用事务消息机制保证消息不丢失。
RocketMQ支持事务消息,整体流程如下图:
Producer 发送 half 消息;
Broker 先把消息写入 topic 是 RMQ_SYS_TRANS_HALF_TOPIC 的队列,之后给 Producer 返回成功;
Producer 执行本地事务,成功后给 Broker 发送 commit 命令(本地事务执行失败则发送 rollback);
Broker 收到 commit 请求后把消息状态更改为成功并把消息推到真正的 topic;
Consumer 拉取消息进行消费。
代码如下:
电商订单场景,事务消息机制流程图:
事务消息机制的作用:
整体来说,在订单这个场景下,消息不丢失的问题实际上就还是转化成了下单这个业务与下游服务的业务的分布式事务一致性问题。而事务一致性问题一直以来都是一个非常复杂的问题。而RocketMQ的事务消息机制,实际上只保证了整个事务消息的一半,它保证的是订单系统下单和发消息这两个事件的事务一致性,而对下游服务的事务并没有保证。但是即便如此,也是分布式事务的一个很好的降级方案。目前来看,也是业内最好的降级方案。
Broker端
同步刷盘
通过配置broker.conf文件,启用同步刷盘:
我们可以简单的把RocketMQ的刷盘方式 flushDiskType配置成同步刷盘就可以保证消息在刷盘过程中不会丢失了。
消息写入内存后,立刻请求刷盘线程进行刷盘,如果消息未在约定的时间内(默认 5 s)刷盘成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到这个响应后,可以进行重试。同步刷盘策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。要开启同步刷盘,需要增加配置:flushDiskType=SYNC_FLUSH。
主从复制
配置主从架构,并设置同步复制:
Broker 为了保证高可用,采用一主多从的方式部署。如下图:
消息发送到 master 节点后,slave 节点会从 master 拉取消息保持跟 master 的一致。这个过程默认是异步的,即master 收到消息后,不等 slave 节点复制消息就直接Producer 返回成功。
这样会有一个问题,如果 slave 节点还没有完成消息复制,这时 master 宕机了,进行主备切换后就会有消息丢失。为了避免这个问题,可以采用 slave 节点同步复制消息,即等 slave 节点复制消息成功后再给 Producer 返回发送成功。只需要增加配置:
brokerRole=SYNC_MASTER
改为同步复制后,消息复制流程如下:
slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset;
master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave;
slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset;
master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。
消费者端
手动提交消费位移
使用手动提交可以确保消息被正确处理后再提交位移(offset)。
先业务后ack(先消费,消费成功后再提交),简单来说就是,在消费者的代码中,一定要在业务逻辑的最后一步return ConsumeConcurrentlyStatuS.CONSUME_SUCCESS;
如果 Consumer 消费成功,返回 CONSUME_SUCCESS,提交 offset 并从 Broker 拉取下一批消息。
Consumer 重试
Consumer 消费失败,这里有 3 种情况:
返回 RECONSUME_LATER
返回 null
抛出异常
Broker 收到这个响应后,会把这条消息放入重试队列,重新发送给 Consumer。
注意:
Broker 默认最多重试 16 次,如果重试 16 次都失败,就把这条消息放入死信队列,Consumer 可以订阅死信队列进行消费。
重试只有在集群模式(MessageModel.CLUSTERING)下生效,在广播模式(MessageModel.BROADCASTING)下是不生效的。
Consumer 端一定要做好幂等处理。
其实重试 3 次都失败就可以说明代码有问题,这时 Consumer 可以把消息存入本地,给 Broker 返回CONSUME_SUCCESS 来结束重试。代码如下:
幂等性消费
在消费端实现幂等性处理,确保重复消费不会导致业务问题。
消费者端不要使用异步消费机制(同步处理消息,再提交offset)
正常情况下,消费者端都是需要先处理本地事务,然后再给MQ一个ACK响应,这时MQ就会修改Offset,将消息标记为已消费,从而不再往其他消费者推送消息。所以在Broker的这种重新推送机制下,消息是不会在传输过程中丢失的。但是也会有下面这种情况会造成服务端消息丢失:
这种异步消费的方式,就有可能造成消息状态返回后消费者本地业务逻辑处理失败造成消息丢失的可能。
总结
RocketMQ通过以下方式保证消息不丢失:
1.生产者端:同步发送、异步发送+重试、事务消息。
2.Broker端:同步刷盘、主从复制。
3.消费者端:手动提交消费位移、消费重试、幂等性消费。
通过这些机制的组合,RocketMQ能够在各个环节保证消息的可靠性,极大地降低了消息丢失的风险。在实际应用中,可以根据业务需求选择合适的配置和实现方式,以在可靠性和性能之间取得平衡。
重试机制
- 生产者端重试: 如果消息发送失败,生产者可以设置重试次数和重试策略。在重试期间,如果仍然无法发送成功,生产者可以选择记录日志或将消息写入本地数据库进行补偿。
- 消费者端重试: 如果消费者在处理消息时发生异常,RocketMQ 会自动将该消息重新放回队列,等待下次重试。消费者可以通过配置最大重试次数来控制消息的重试策略。
Kafka如何保证消息不丢失?(如何保证消息的可靠性传输?)
Producer通过网络发送消息到Kafka集群,然后Consumer来进行消费,如下图:
服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。
为了保证消息在传递过程中,消息不会丢失,Kafka设计了很多重要机制来保证消息的可靠性。
生产者层面保证消息不丢失
acks 参数设置(控制消息发送确认机制)
Kafka 生产者发送消息时,可以通过设置生产端的 acks 参数来控制消息发送确认机制。
acks = 0:生产者在发送消息后不会等待任何来自 Kafka 集群的确认。这种模式下,消息可能在发送过程中丢失,比如网络问题导致消息根本没到达 Kafka 服务器,但它能提供最高的吞吐量,适用于对消息丢失不太敏感的场景。
acks = 1:生产者发送消息后,只要消息成功写入 Kafka 分区的主副本(leader replica),就会收到确认。不过,如果在消息写入主副本后,但还没来得及同步到其他副本(follower replica)时,主副本所在节点宕机,消息可能会丢失。示例代码:
acks = -1(或 all):生产者会等待所有同步副本(in - sync replicas)都成功写入消息后才收到确认。这是最安全的模式,能最大程度保证消息不丢失,但会影响吞吐量。
重试机制
当消息发送失败时,生产者可以设置重试机制。可以通过以下方式配置:
同时,可以结合自定义的错误处理逻辑,例如:
Kafka集群层面保证消息不丢失(Broker端)
副本机制(多副本机制)
Kafka通过副本(replica)的方式来实现数据冗余。在Kafka中,每个topic都可以配置副本数量,副本数量越多,数据可靠性越高,但会占用更多的存储空间和网络带宽。每个分区(partition)也可以有多个副本,其中一个是主副本(leader replica),其余是从副本(follower replica)。主副本负责处理读写请求,从副本则定期从主副本同步数据。当主副本不可用时,会从从副本中选举出新的主副本。
针对每个Partitition,会选举产生一个Leader节点,负责响应客户端的请求,并优先保存消息。而其它节点则作为Follower节点,负责备份Master节点上的消息。
刷盘策略(控制刷盘的频率)
Kafka 的消息是先写入内存中的缓冲区,然后定期刷盘。可以通过配置 flush.messages 和 flush.ms 参数来控制刷盘的频率。例如,设置 flush.messages = 1 表示每写入一条消息就刷盘一次,这样可以保证消息的持久化,但会降低性能;设置 flush.ms = 1000 表示每 1 秒刷盘一次,在性能和可靠性之间进行了平衡。
ISR(In - Sync Replicas)机制(合理配置 ISR)
在Kafka中,针对每个Partition,Kafka会维护一个ISR列表,里面记录当前处于同步状态的所有Partition,并通过ISR机制确保消息不会在Master故障时丢失。
ISR 集合中的副本与 Leader 分区保持同步,如果某个 Follower 分区落后 Leader 分区太多,会被踢出 ISR 集合。可以通过设置 min.insync.replicas 参数来指定 ISR 集合中最少需要保持同步的副本数。当生产者设置 acks = all 时,如果 ISR 集合中的副本数小于 min.insync.replicas,生产者会收到错误信息,从而避免消息丢失。
ISR 是与主副本保持同步的副本集合。只有在 ISR 中的副本都成功写入消息后,生产者才会收到确认(当 acks=-1 或 all 时)。如果一个副本长时间未与主副本同步(可通过参数 replica.lag.time.max.ms 配置),它会被移出 ISR。
消费者层面保证消息不丢失
手动提交偏移量(手动提交offset)
消费者确认机制:手动提交offset。
如果消费者这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer直接宕机了,未处理完的数据丢失了,下次也消费不到了。
Kafka 消费者在消费消息时,会记录消费的偏移量(offset),表示已经消费到的消息位置。默认情况下,消费者会自动提交偏移量,这种方式可能会导致消息丢失。建议使用手动提交偏移量的方式,在消息处理完成后再提交偏移量,确保消息不会因为消费者在处理过程中崩溃而丢失。可以通过设置 enable.auto.commit 参数为 false 来关闭自动提交偏移量,然后使用 commitSync() 或 commitAsync() 方法手动提交偏移量。
Kafka 消费者可以通过手动提交偏移量(offset)来精确控制消息的消费进度。在消费者成功处理消息后,手动提交偏移量,确保消息不会被重复消费或丢失。
示例:
Kafka消费者在处理完消息后,手动提交offset,向Kafka Broker发送确认消息,表示消息已经被成功处理。如果消费者未发送确认消息,则Kafka Broker会保留消息并等待消费者再次拉取。这样可以保证消息被正确处理且不会重复消费。
MQ%E9%9B%86%E7%BE%A4%E6%8C%82%E4%BA%86%E4%B9%8B%E5%90%8E%E6%80%8E%E4%B9%88%E5%8A%9E%EF%BC%9F" name="%E6%95%B4%E4%B8%AAMQ%E9%9B%86%E7%BE%A4%E6%8C%82%E4%BA%86%E4%B9%8B%E5%90%8E%E6%80%8E%E4%B9%88%E5%8A%9E%EF%BC%9F">整个MQ集群挂了之后怎么办?
整个MQ集群挂了之后准备降级方案。
如果整个集群挂了(集群中所有的节点都挂了),在这种情况下,MQ相当于整个服务都不可用了,那它本身肯定无法给我们保证消息不丢失了。我们只能自己设计一个降级方案来处理这个问题了。例如在订单系统中,如果多次尝试发送MQ不成功,那就只能另外给找地方(Redis、文件或者内存等)把订单消息缓存下来,然后起一个线程定时的扫描这些失败的订单消息,尝试往MQ发送。这样等MQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。整个这套降级的机制,在大型互联网项目中,都是必须要有的。
如何保证消息不被重复消费?(如何保证消息消费的幂等性?/如何保证消息队列的幂等性?)
保证消息的唯一性(给每个消息一个唯一的标识)。
实际上我们只要保证多条相同的数据过来的时候只处理一条或者说多条处理和处理一条造成的结果相同即可。
一般消费端都是要做消费幂等处理的。
在业务上处理幂等问题。处理幂等问题的关键是要给每个消息一个唯一的标识,可以给每个消息指定一个全局唯一的MessageID,在消费者端针对MessageID做幂等性判断。
1.利用数据库唯一主键约束(消息保存数据库场景)。
2.消息去重判断(可以拿到消息做redis的set的操作,因为你无论set几次结果都是一样的,set操作本来就是幂等操作)。
在实际工作中,最好是能够用一个具有业务意义的数据作为唯一键会更好,这样能更好的防止重复消费问题对业务的影响。