消息队列 - 如何保证消息的顺序性
最后更新于
转载: https://juejin.im/post/5c9b1c155188251d806727b2
如果存在多个消费者,那么就让每个消费者对应一个Queue,然后把要发送的数据全都放到一个Queue,这样就能保证所有的数据只到达一个消费者从而保证每个数据到达数据库都是顺序的。
RabbitMQ:拆分多个Queue,每个Queue一个consumer,就是多一些Queue而已,确实是麻烦点;或者就一个Queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理。
Kafka 写入partion时指定一个key,列如订单id,那么消费者从partion中取出数据的时候肯定是有序的,当开启多个线程的时候可能导致数据不一致,这时候就需要内存队列,将相同的hash过的数据放在一个内存队列里,这样就能保证一条线程对应一个内存队列的数据写入数据库的时候顺序性的,从而可以开启多条线程对应多个内存队列。
Kafka:一个topic,一个partition,一个consumer,内部单线程消费,写N个内存Queue,然后N个线程分别消费一个内存Queue即可。
这个是我们真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复consumer的问题,让他恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不能在面试的时候说吧。
一个消费者一秒是1000条,一秒3个消费者是3000条,一分钟是18万条,1000多万条
所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过来
一般这个时候,只能操作临时紧急扩容了,具体操作步骤和思路如下:
先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。
新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的Queue数量。
然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的Queue。
接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时Queue的数据。
这种做法相当于是临时将Queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。
等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息。
这里我们假设再来第二个坑
假设你用的是RabbitMQ,RabbitMQ是可以设置过期时间的,就是TTL,如果消息在Queue中积压超过一定的时间就会被RabbitMQ给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。
这个情况下,就不是说要增加consumer消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。
这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把白天丢的数据给他补回来。也只能是这样了。
假设1万个订单积压在mq里面,没有处理,其中1000个订单都丢了,你只能手动写程序把那1000个订单给查出来,手动发到mq里去再补一次
然后我们再来假设第三个坑
如果走的方式是消息积压在mq里,那么如果你很长时间都没处理掉,此时导致mq都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据!