(no title)
2024.07.23
wlg
℃
后端技术选型 订单超时自动关闭的方案 背景 之前遇到过一个需求,类似于超过一定时间自动关闭订单。拿电商领域的订单举例,比如订单超时自动收货、用户下单后放弃支付超过半小时后订单需要自动关闭,订单关闭后才能进行后续的流程,如解锁库存,通知用户等。依稀记得前几年背过类似的八股文,没想到有一天真能用上,特此做一份整理,如有错误欢迎指正。
一、定时任务轮询数据库 大部分人第一个想到的便是这个方案,思路如下:
生成订单时,记录预计的过期时间(expire_time = now() + 30min
) ,和state一起加上索引,3s轮询数据库,查出超时未付款的订单id
通过定时器(如xxl-job)去轮询数据库,比如3s一次,查询超时未付款的订单idselect id from order where state = '未付款' and now() >= expire_time limit 500
尝试对订单ids加锁,成功的订单ids可以更新订单状态为超时未付款
定时任务有间隔,而且在表数据量较大的情况下查询会变慢,导致订单超时但定时任务可能还未关闭订单,而此时可能又有订单操作。因此在查询订单、更新订单时要检查订单是否超时未付款,超时则关闭订单
优点
实现起来简单,成本低,可靠性强,后续维护和排查问题也方便。算是主流的解决方案。
缺点
精度不够,轮询总需要设定一个时间间隔。当时间间隔很小但数据量很大时,即便有索引也会有较大的db压力。如果间隔设置较大,订单关闭就会有延迟
二、redis过期监听(不可行) 在redis.conf
中设置notify-keyspace-events Ex
,在KeyExpirationEventMessageListener.onMessage
接收消息并处理。
1 2 3 4 5 6 7 8 9 10 11 notify-keyspace-events Ex public class Test extends KeyExpirationEventMessageListener { public Test(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @Override public void onMessage(Message message, byte[] bytes) { String orderId = message.toString(); } }
notify-keyspace-events
参数如下,可任意组合用于指定要发送的通知类型,但K
和E
至少2选1
缺点
理论上很美好,可实际生产环境不建议使用,这个方案有较大缺陷
并不是key过期就立刻发送过期通知,延迟是必定存在的。当key被实际删除时才会开始生成过期事件,而实际删除key的时间是不准确的,时机分2种:
惰性删除:访问到某个过期的key时,删除该key
定时删除:定时每隔100ms,检查设置了过期时间的部分key,并删除过期的key
发送的通知并不能像消息队列那样保证送达,即有可能丢消息,比如发通知时服务正好重启。显然可靠性不足
三、redis-zset做延迟队列 Zset是一个有序集合,存储member和score,通过字典(member为key)+跳表(按score排序)实现,可以by字典精确搜索member得到节点,或by跳表搜索score得到节点。这种数据结构正好符合我们的需求。我们将预计过期时间设置为score,订单id设置为member。新建订单时插入,支付后删除节点,未付款的订单存于zset中。每秒轮询redis,通过rangeByScore
获取now() > score
的任务,执行完remove
优点
实现简单易维护
在内存中增删查询节点,且实际未付款订单数量少,执行效率高
可以存储不同过期时间的任务
缺点
没有重试机制,当单线程执行任务时只需在执行成功时再移除任务,但当需要多线程取任务执行时,需要额外增加1个集合,用于保存执行中的任务,执行成功则删除,执行失败则回滚到原集合
四、消息队列-延迟消息 延迟消息,指在发送消息时需指定延迟时间,消息发送成功后不会立刻消费,而是先存储在消息队列服务器,等到达指定的时间后才会被消费。
那么消息队列是怎么实现延迟功能的,消息队列轮询大批量消息不会有性能问题吗,拉Rocketmq源码看了下,大致是这么个思路:设定18个不同延迟时间的队列,每个队列的消息按顺序写入,轮询队列时只需要按顺序检查是否到达指定时间,有则发送消息并记录offset
轮询的流程图:
整体流程图:
源码分析:
1、写入CommitLog
时修改topic
和queueId
防止消费,并记录原始topic
和queueId
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 // org.apache.rocketmq.store.CommitLog.putMessage() if (msg.getDelayTimeLevel() > 0) { // 超过maxLevel的调为maxLevel if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } // 修改topic为SCHEDULE_TOPIC_XXXX topic = ScheduleMessageService.SCHEDULE_TOPIC; // 决定投递到哪个队列:queueId = delayLevel - 1 queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // 保存真实的topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); }
2、修改tagCode为发送时间
1 2 3 4 5 6 7 8 9 10 11 12 13 // org.apache.rocketmq.store.CommitLog.checkMessageAndReturnSize() String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL); if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) { int delayLevel = Integer.parseInt(t); if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel(); } if (delayLevel > 0) { // 将tagsCode替换为发送时间(storeTimestamp + 延迟时间) tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp); } }
3、每个延迟等级都新建定时任务,轮询队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 // org.apache.rocketmq.store.schedule.ScheduleMessageService public void start() { // 对于每个level都建一个timerTask for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); // <delayLevel, consumeQueue.offset> Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { // 1s后执行,任务内部每100ms执行一次 this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } } // org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask.executeOnTimeup public void executeOnTimeup() { ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; if (cq != null) { SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); try { long nextOffset = offset; int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); long now = System.currentTimeMillis(); // 取tagCode作为投递时间 long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long countdown = deliverTimestamp - now; if (countdown <= 0) { // 已经超时 MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner); if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 当前任务ok,顺序检查下一个任务是否到期 continue; } } else { // 未到超时时间 ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } finally { bufferCQ.release(); } } ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); }
RocketMQ 4.x为了性能,在延迟时间上有所妥协,仅支持固定的延迟等级。在5.x中引入了时间轮,支持任意时间的延迟,大致思路如图:
借助延迟消息的方案思路如下:
创建订单成功后,发送延迟消息到mq
用户支付后删除对应消息
收到消息后进行处理
由于发送消息可能失败,需要定时轮询数据库,处理超时未付款的订单
1 2 message.setDelayTimeLevel(16) producer.send(message);
优点
把轮询的压力给到消息队列,业务中的逻辑变轻松,且数据库压力变小
缺点
延迟消息其实并不是一种特别好的方案,缺点如下
消息过多:使用消息队列往往是因为数据过多,数据库轮询无法满足需求,那么如果在每个订单创建时都发送延迟消息,就需要考虑消息积压的问题。可实际上超时未付款的订单只是极少数,正常来说用户是会付款或手动取消订单的。所以大部分数据都是无用消息,平白无故浪费资源,增加成本
消息队列的限制:各个项目对于延迟关闭的需求不同,而且有变化的可能,但消息队列并不是那么灵活,比如对延迟时间的限制
RabbitMQ:延时最大为 2^32-1
毫秒,约49天,超过的会被立刻消费
RocketMQ 4.x:18个延迟级别,1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
RocketMQ 5.x:最大默认3天
可靠性问题:任何额外引入的中间件都要考虑其可靠性,比如消息发送失败需要额外的兜底方案,消息延迟,排查问题不方便等。在这种场景下使用消息队列,意味着更复杂的开发和维护流程,问题可能来自于消息队列的异常或开发人员不了解原理,导致使用不当
总结 项目应将可靠性、可维护性放在首位,尤其是对于小项目或刚起步的项目,数据库轮询或redis轮询完全足够了,没必要为一个超时功能,额外引入消息队列这么重的中间件,过度设计徒增成本。即使采用了redis或消息队列等方案,也最好保留数据库轮询方案,作为异常情况的兜底