线上MQ消息积压了怎么处理
线上MQ消息积压了怎么处理
面试官您好,线上 MQ 消息积压我会严格按照 "先止损、再排查、后预防"的三步法来处理,核心原则是优先保证核心业务可用,再逐步定位和解决根本问题,绝对不能先花半小时查原因而让业务一直挂着。
🚨 第一步:紧急止损(10 分钟内必须完成)
这是线上问题的第一优先级,先把业务救回来再说!
📊 紧急处理流程图
关键操作要点
1.优先扩容消费端 📈
- 核心限制:Kafka/RocketMQ 中一个分区只能被一个消费线程消费,所以消费线程数最多等于分区数
- 操作:先临时增加消费组实例数,再调大单个实例的消费线程池核心数
2.降级非核心业务 ⚡
- 立即关闭日志、统计、推送等非核心消息的消费,把 CPU / 内存 / 数据库连接资源全部让给核心业务
- 例子:电商大促时,先停掉用户行为分析和商品推荐的消息消费
3.消息临时转储 📦
- 当积压量达到百万级以上时,直接消费会拖垮整个 MQ 集群
- 方案:写个简单脚本把消息先转存到 Redis/MySQL/OSS,等业务高峰过后再慢慢回放消费
4.跳过死信消息 💀
- 如果有大量重复失败的死信消息,先临时跳过,避免阻塞正常消息的消费
- 事后再单独拉取死信队列进行人工或自动处理
🔍 第二步:根因排查(业务恢复后立即进行)
从生产端、MQ 集群、消费端三个方向逐一排查,90% 以上的问题都出在消费端。
📋 常见根因排查对照表
| 问题方向 | 典型现象 | 快速排查方法 |
|---|---|---|
| 生产端突发流量 | 生产 TPS 突然飙升 5-10 倍,消费速度跟不上 | 查看监控面板的生产速率曲线 检查是否有大促、爬虫或批量任务 |
| 消费端性能瓶颈 | 消费 TPS 远低于生产 TPS,CPU/IO 高 | 查看消费端日志是否有慢 SQL / 第三方超时 jstack 看线程是否阻塞在数据库调用 |
| 消费逻辑异常 | 消费速度为 0,大量报错日志 | 检查是否有死循环、空指针、幂等性问题 查看是否有消息重复消费导致的死循环 |
| MQ 集群故障 | 所有消费组都积压,生产也变慢 | 查看 broker 的磁盘使用率、CPU、内存 检查是否有 broker 宕机或网络分区 |
| 消息体过大 | 单条消息超过 1MB,网络传输慢 | 查看消息大小分布 检查是否有大对象序列化问题 |
容易忽略的坑
- 消费端数据库连接池耗尽,导致所有消费线程都在等待连接
- 第三方接口超时时间设置过长(比如 30 秒),导致消费线程被长时间阻塞
- 消费端线程池队列过长,导致任务堆积在内存中,看起来消费速度很慢
💻 核心代码实现(技术亮点标注)
1. 高性能消费端线程池配置(RocketMQ)
@Configuration
public class RocketMQConsumerConfig {
// 技术亮点1:根据分区数动态配置线程数,避免资源浪费
// 假设topic有16个分区,每个实例最多配置16个消费线程
@Value("${rocketmq.consumer.thread-nums:16}")
private int consumerThreadNums;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public DefaultMQPushConsumer orderConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("order_topic", "*");
// 技术亮点2:使用JDK21虚拟线程,大幅提升并发处理能力
ThreadFactory virtualThreadFactory = Thread.ofVirtual()
.name("order-consumer-", 0)
.factory();
// 技术亮点3:合理配置线程池参数,避免OOM和任务堆积
consumer.setConsumeThreadMin(consumerThreadNums);
consumer.setConsumeThreadMax(consumerThreadNums);
// 关键:队列大小设置为0,使用SynchronousQueue直接提交任务
consumer.setConsumeMessageThreadPoolQueue(new SynchronousQueue<>());
// 使用虚拟线程工厂
consumer.setConsumeThreadFactory(virtualThreadFactory);
// 技术亮点4:批量消费,一次拉取32条消息,减少网络IO
consumer.setConsumeMessageBatchMaxSize(32);
consumer.setPullBatchSize(32);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
// 批量处理消息
orderService.batchProcessOrders(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("消费消息失败", e);
// 技术亮点5:失败重试策略,最多重试3次,避免死循环
if (context.getReconsumeTimes() >= 3) {
log.error("消息重试超过3次,转入死信队列: {}", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
return consumer;
}
}2. 消息临时转储工具(紧急止损核心代码)
@Component
public class MessageDumpService {
@Autowired
private StringRedisTemplate redisTemplate;
// 技术亮点:先写转储再提交offset,保证消息绝对不丢失
public void dumpMessages(List<MessageExt> msgs) {
String dumpKey = "mq:dump:order:" + System.currentTimeMillis();
List<String> messageBodies = msgs.stream()
.map(MessageExt::getBody)
.map(String::new)
.toList();
// 1. 原子性写入Redis
redisTemplate.opsForList().rightPushAll(dumpKey, messageBodies);
// 2. 设置过期时间,避免内存泄漏
redisTemplate.expire(dumpKey, Duration.ofDays(7));
log.info("成功转储{}条消息到Redis,key: {}", msgs.size(), dumpKey);
}
// 事后回放消息
public void replayMessages(String dumpKey) {
while (true) {
// 每次弹出10条消息处理
List<String> messages = redisTemplate.opsForList().leftPop(dumpKey, 10);
if (CollectionUtils.isEmpty(messages)) {
break;
}
try {
orderService.batchProcessOrderBodies(messages);
} catch (Exception e) {
log.error("回放消息失败", e);
// 失败的消息重新放回队列尾部
redisTemplate.opsForList().rightPushAll(dumpKey, messages);
}
}
}
}3. 基于 Redis 的幂等性实现(防止重复消费)
@Component
public class IdempotentService {
@Autowired
private StringRedisTemplate redisTemplate;
// 技术亮点:使用Redis SETNX+过期时间实现高效幂等性
// 性能比数据库唯一索引高10倍以上
public boolean isProcessed(String messageId) {
String key = "mq:idempotent:" + messageId;
// SETNX: 只有key不存在时才设置成功
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(key, "1", Duration.ofHours(24));
return Boolean.FALSE.equals(result);
}
}
// 使用方式
if (idempotentService.isProcessed(msg.getMsgId())) {
log.info("消息已处理,跳过: {}", msg.getMsgId());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}🎯 技术难点与解决方案对照表
| 技术难点 | 核心挑战 | 解决方案 | 技术亮点 |
|---|---|---|---|
| 分区数与消费能力的矛盾 | Kafka/RocketMQ 中一个分区只能被一个消费线程消费,分区数固定时消费能力有上限 | 1. 紧急:降级非核心业务 + 消息转储 2. 长期:提前规划分区数(按峰值 TPS/1000 计算) 3. 动态:支持在线扩容分区 | 在线扩容分区时保证消息顺序性,使用 "先扩容再迁移" 策略 |
| 消息顺序性与并发消费的冲突 | 并发消费会破坏消息顺序,但单线程消费吞吐量太低 | 1. 局部顺序:按业务 ID(如订单号)哈希分区 2. 分段锁:同一业务 ID 的消息串行处理 3. 版本号:使用乐观锁保证最终一致性 | 基于 Redis 分布式锁实现同一业务 ID 的串行处理,兼顾顺序性和并发性 |
| 消息转储与不丢消息的保证 | 转储过程中如果服务宕机,会导致消息丢失 | 采用 "两阶段提交" 策略: 1. 先将消息写入转储存储 2. 写入成功后再提交 offset 3. 转储失败则不提交 offset,下次重新拉取 | 使用事务消息或本地事务表保证转储和 offset 提交的原子性 |
| 死信消息的自动化处理 | 死信消息如果不及时处理,会越积越多,最终导致磁盘满 | 1. 配置死信队列,自动转发失败超过 3 次的消息 2. 定时任务自动重试死信消息(指数退避策略) 3. 超过最大重试次数的消息转人工处理 | 指数退避重试策略:第 1 次 1 分钟,第 2 次 5 分钟,第 3 次 30 分钟 |
| 大流量下的幂等性问题 | 高并发下重复消费会导致数据不一致,数据库唯一索引性能差 | 1. 优先使用 Redis SETNX 实现幂等性 2. 数据库唯一索引作为兜底 3. 业务层面设计天然幂等的操作 | 基于消息 ID + 业务 ID 的双重幂等性校验,兼顾性能和可靠性 |
| 消费端慢 SQL 导致的雪崩 | 一条慢 SQL 会阻塞整个消费线程池,导致所有消息都无法消费 | 1. 给所有数据库操作设置超时时间 2. 使用 Hystrix/Sentinel 熔断慢 SQL 3. 单独线程池处理数据库操作 | 线程池隔离:核心业务和非核心业务使用不同的线程池 |
🛡️ 第三步:长效预防措施(避免下次再发生)
1.生产端流量控制 🚦
- 接入 Sentinel/Hystrix 限流组件,限制生产端的消息发送速率
- 对批量任务进行削峰填谷,避免凌晨突然产生百万级消息
2.消费端性能优化 ⚙️
- 批量消费:一次拉取 10-50 条消息处理,减少网络 IO 次数
- 异步化:把非核心逻辑放到单独的线程池处理
- 线程池调优:根据业务场景调整核心线程数和队列大小
3.完善监控告警体系 📊
- 核心监控指标:消息积压量、生产 TPS、消费 TPS、消费延迟时间
- 告警阈值:积压量超过 1000 条普通告警,超过 10000 条紧急电话告警
- 多维度告警:短信、企业微信、电话
4.死信队列自动化处理 🤖
- 配置死信队列,自动转发失败超过 3 次的消息
- 写定时任务自动重试死信消息,超过重试次数的转人工处理
5.定期灾备演练 🧪
- 每季度模拟一次消息积压场景,验证紧急处理流程
- 测试扩容、降级、转储等操作的耗时和效果
✨ 面试加分项(让面试官眼前一亮的点)
- 准确说出分区数与消费线程数的关系:Kafka 中一个分区只能被一个消费线程消费,所以消费线程数最多等于分区数
- 提到消息幂等性:在扩容和重试的时候,必须保证消息不会被重复消费
- 提到消息顺序性:如果业务要求消息顺序,扩容的时候不能随便增加消费者
- 提到灰度发布:消费端上线前先灰度发布 10% 的流量,避免全量上线导致的大面积积压
- 提到MQ 集群容量规划:提前根据业务增长规划分区数和 broker 数量,避免临时扩容
💡 面试官高频追问预判
追问 1:如果扩容到分区数上限还是积压怎么办?
答:先降级非核心业务,然后临时转储消息,事后再优化消费逻辑或者增加分区数
追问 2:消息转储的时候怎么保证不丢消息?
答:采用 "先写转储成功,再提交 offset" 的方式,确保消息至少被处理一次
追问 3:怎么避免死信消息的产生?
答:做好消息幂等性,增加合理的重试次数,对不可恢复的异常直接标记为死信
面试线程实录
好的,我们直接切到面试现场👇
面试官:
我们聊一道场景设计题——线上MQ消息积压了,你会怎么处理?结合你的实战经历说说。
候选人:
好的面试官😎。我之前在电商大促就碰到过,RocketMQ突然堆了800万条消息,监控一片红🚨。当时我是按“先止血 → 再排查 → 后优化”这九个字来处理的。
面试官:
嗯,先说说止血怎么做的。
候选人:
第一步,紧急扩容消费者。Consumer Group没打满就直接加机器,RocketMQ里只要实例数不超过读队列数,加多少台就能线性提升多少倍消费能力。当时我们16个读队列只起了4个消费者,扩到16台,消费速度当场翻倍⚡。
面试官:
那万一读队列已经打满,消费者加不进去呢?
候选人:
这就要看场景了。如果可以接受短暂乱序,直接动态增加读写队列,再补消费者。
但像订单这种严格要求顺序的,我就建一个紧急的“泄洪Topic”,把积压消息通过脚本快速搬运过去,然后用20台临时机器全力消费,跟原业务完全隔离——这招业内叫临时队列泄洪。
我画个流程给您看下👇
面试官:
思路很清楚。除了扩容,止血还有别的手段吗?
候选人:
有的,业务降级 + 丢弃非关键消息。
我当时大喊一声 (╯°□°)╯︵ ┻━┻ 然后果断上线了开关,把埋点日志、用户行为这类非核心消息直接return掉并记录offset,集中资源保订单和支付。核心积压10分钟就开始消了。
面试官:
很好,止血稳住了。接下来你怎么排查根因?否则还会复现对吧。
候选人:
对,90%的积压都是消费慢导致的。我一般会用 Arthas trace 直接抓消费链路的耗时分布,当时一抓就发现消费一条消息居然花了3秒,全是调下游优惠券接口卡住的,P99超时严重。
除了这个,我还会并行排查几个常见瓶颈:
| 瓶颈类型 | 表象 | 排查手段 |
|---|---|---|
| 慢SQL / 缺索引 | 消费线程卡在JDBC | 慢查询日志、Arthas trace |
| 下游RPC超时 | 大量TimeoutException | SkyWalking、调用链统计 |
| 频繁GC | 消费时不时停顿 | jstat -gc、GC日志 |
| 锁竞争激烈 | 线程堆积在synchronized | jstack 或 Arthas thread -b |
| 消息处理串行 | 单条消费耗时极长 | 查看Consumer拉取参数 |
面试官:
你当时那个优惠券接口是怎么解决的?
候选人:
上游加了一层本地缓存+降级兜底,接口挂了直接走缓存或者默认券,单条消息处理从3秒降到20ms,积压迅速就消下去了📉。
面试官:
做完这些,你们还做了什么长期优化?
候选人:
做了几件长效的事:
- 批量消费:调整
consumeMessageBatchMaxSize,减少网络往返。 - 提升并发度:调整
consumeThreadMin/Max,把线程池拉上来。 - 异步化落库:消费后写DB改成异步+队列,不拖慢消费主流程。
- 监控与限流:生产端加限流,积压量实时告警,提前感知。
面试官:
不错。如果积压的是顺序消息,上面这些操作有什么坑吗?
候选人:
顺序消息扩容要特别小心,直接增加读写队列会导致同一个key的消息被分到不同队列,顺序就乱了。一般会暂停非顺序消费,把资源全部倾斜给顺序队列,或者按业务key水平拆分,增加分区数,但要保证单key永远进同一队列。
面试官:
还有什么要补充的经验吗?
候选人:
血泪经验😅:大促前一定要压测消费链路,很多团队只压生产端,结果消费链路被拖垮,一翻车就是大事故。我们有次差点耽误双十一,后来每次压测都带消费环节。
面试官:
刚才你提到了扩容消费者、降级、异步化等操作,能展示一段你们当时处理积压的核心代码吗?我想看看有没有技术亮点。
候选人:
好的,我挑三段最有代表性的代码👇
亮点① 批量消费 + 动态线程池
RocketMQ 默认是单条拉取,我们改成了批量消费,并且线程池参数做到可动态调整,不用重启就能应对突发积压。
// 消费者核心配置
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_group");
consumer.setNamesrvAddr("namesrv:9876");
consumer.setConsumeMessageBatchMaxSize(64); // 一次拉取64条
consumer.setPullBatchSize(32); // 长轮询拉取批次
// 动态线程池,支持运行时调整
ThreadPoolExecutor consumeExecutor = new ThreadPoolExecutor(
20, 80, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("consume-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝时由拉取线程同步执行,防止丢消息
);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// 线程池异步处理,主线程快速返回 CONSUME_SUCCESS 占位
consumeExecutor.submit(() -> {
processBatch(msgs); // 批量处理
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});技术亮点:
用 CallerRunsPolicy 实现背压,当线程池满了,就由拉取线程自己同步执行,宁可消费慢一点,也绝不丢消息 🛡️。同时配合 Apollo 动态下发核心线程数,大促期间一条指令就能提速。
亮点② 降级开关 + Offset 安全丢弃
非核心消息紧急丢弃时,必须保证不丢 offset,否则重启后会重复消费大量无用消息。
@Value("${degrade.switch:false}")
private boolean degradeSwitch;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
if (degradeSwitch) {
// 只处理订单/支付等核心 Topic,其余直接确认 + 记录流水
for (MessageExt msg : msgs) {
if (!isCoreTopic(msg.getTopic())) {
log.warn("降级丢弃非核心消息, msgId={}, offset={}",
msg.getMsgId(), msg.getQueueOffset());
// 仍然返回 SUCCESS,让 Broker 确认消费,不会重复投递
} else {
processCore(msg);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 正常消费逻辑...
}技术亮点:
非核心消息直接 CONSUME_SUCCESS,既释放了积压,又不会有重复投递。同时日志记录 offset,事后可以回溯补数据。这个开关我们挂在配置中心,秒级生效 ⚡。
亮点③ 异步落库 + 本地队列削峰
消费消息后需要写 DB,同步写入是最大的瓶颈,我们用内存队列异步批量写库,消费和持久化解耦。
// 批量写库任务
private final LinkedBlockingQueue<OrderEvent> buffer = new LinkedBlockingQueue<>(10000);
@PostConstruct
public void init() {
// 单独线程批量刷库,200ms 或满 100 条触发
ScheduledExecutorService flusher = Executors.newSingleThreadScheduledExecutor();
flusher.scheduleAtFixedRate(() -> {
List<OrderEvent> batch = new ArrayList<>();
buffer.drainTo(batch, 100);
if (!batch.isEmpty()) {
orderDao.batchInsert(batch);
}
}, 0, 200, TimeUnit.MILLISECONDS);
}
public void processBatch(List<MessageExt> msgs) {
for (MessageExt msg : msgs) {
OrderEvent event = parse(msg);
if (!buffer.offer(event)) { // 队列满了
orderDao.insert(event); // 降级为同步写入,背压到消费端
}
}
}技术亮点:
LinkedBlockingQueue + 定时批量刷盘,将磁盘 IO 从消费主链路剥离。队列满时自动降级为同步写,保证不丢数据,也限制了内存占用 📉。
面试官:
代码很有实战感。那这个场景,你总结下主要的技术难点和解决方案吧。
候选人:
我理一个表,一目了然 👇
| 技术难点 | 难点描述 | 解决方案 |
|---|---|---|
| 快速泄洪不丢消息 | 消费者已达上限,无法加机器,直接扩容队列可能破坏顺序 | 临时 Topic 泄洪 + 脚本搬运;非顺序消息动态扩队列 |
| 顺序消息扩容 | 增加写队列会导致单 key 路由到多个队列,顺序乱序 | 暂停非顺序消费,按业务 key 水平拆分分区,保证单 key 永远进同一队列 |
| 消费瓶颈定位 | 表象是积压,根因可能是慢 SQL、RPC 超时、GC、锁竞争 | 全链路追踪 + Arthas trace 精准定位耗时点 |
| 降级与重复消费取舍 | 丢弃非核心消息后,重复消费会导致大量无用计算 | 直接返回 CONSUME_SUCCESS 确认消费,记录 offset 供事后回溯 |
| 扩容平滑性 | 修改队列数可能需要重启 Consumer,导致消费短暂中断 | 利用 RocketMQ 动态加减队列特性,客户端无感;配合滚动重启 |
| 生产端连锁雪崩 | 消费慢导致 Broker 写满,进而阻塞生产者 | 生产端异步发送 + 限流,保护 Broker 写权限;消费端快速降级 |
| 消费线程池调优 | 线程数太少消费慢,太多可能 OOM 或上下文切换开销大 | 动态线程池 + 压测定基线,配合 CallerRunsPolicy 实现背压 |
| 事后数据补全 | 降级丢弃的消息需要补回业务数据 | 消费时打印 key 和 offset 日志,用离线脚本批量回放 |
面试官:
总结得很到位,尤其是动态线程池和异步落库那段,能在不丢消息的前提下把吞吐量提上去,确实是高并发场景的核心解法 💪
候选人:
谢谢面试官,这些都是在几次大促故障里用血泪换来的 😅,每次复完盘都会沉淀一个可复用的组件,现在团队新人一上来就能用。
