跳到主要内容

后端技术选型

订单超时自动关闭的方案

背景

之前遇到过一个需求,类似于超过一定时间自动关闭订单。拿电商领域的订单举例,比如订单超时自动收货、用户下单后放弃支付超过半小时后订单需要自动关闭,订单关闭后才能进行后续的流程,如解锁库存,通知用户等。依稀记得前几年背过类似的八股文,没想到有一天真能用上,特此做一份整理,如有错误欢迎指正。

一、定时任务轮询数据库

大部分人第一个想到的便是这个方案,思路如下:

  1. 生成订单时,记录预计的过期时间(expire_time = now() + 30min) ,和state一起加上索引,3s轮询数据库,查出超时未付款的订单id
  2. 通过定时器(如xxl-job)去轮询数据库,比如3s一次,查询超时未付款的订单idselect id from order where state = '未付款' and now() >= expire_time limit 500 尝试对订单ids加锁,成功的订单ids可以更新订单状态为超时未付款
  3. 定时任务有间隔,而且在表数据量较大的情况下查询会变慢,导致订单超时但定时任务可能还未关闭订单,而此时可能又有订单操作。因此在查询订单、更新订单时要检查订单是否超时未付款,超时则关闭订单

优点

实现起来简单,成本低,可靠性强,后续维护和排查问题也方便。算是主流的解决方案。

缺点

精度不够,轮询总需要设定一个时间间隔。当时间间隔很小但数据量很大时,即便有索引也会有较大的db压力。如果间隔设置较大,订单关闭就会有延迟

二、redis过期监听(不可行)

redis.conf中设置notify-keyspace-events Ex,在KeyExpirationEventMessageListener.onMessage接收消息并处理。

notify-keyspace-events Ex 
public class Test extends KeyExpirationEventMessageListener {
public Test(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}

@Override
public void onMessage(Message message, byte[] bytes) {
String orderId = message.toString();
}
}

notify-keyspace-events参数如下,可任意组合用于指定要发送的通知类型,但KE至少2选1

img

缺点

理论上很美好,可实际生产环境不建议使用,这个方案有较大缺陷

  1. 并不是key过期就立刻发送过期通知,延迟是必定存在的。当key被实际删除时才会开始生成过期事件,而实际删除key的时间是不准确的,时机分2种:
    1. 惰性删除:访问到某个过期的key时,删除该key
    2. 定时删除:定时每隔100ms,检查设置了过期时间的部分key,并删除过期的key
  2. 发送的通知并不能像消息队列那样保证送达,即有可能丢消息,比如发通知时服务正好重启。显然可靠性不足

三、redis-zset做延迟队列

Zset是一个有序集合,存储member和score,通过字典(member为key)+跳表(按score排序)实现,可以by字典精确搜索member得到节点,或by跳表搜索score得到节点。这种数据结构正好符合我们的需求。我们将预计过期时间设置为score,订单id设置为member。新建订单时插入,支付后删除节点,未付款的订单存于zset中。每秒轮询redis,通过rangeByScore获取now() > score的任务,执行完remove

优点

  1. 实现简单易维护
  2. 在内存中增删查询节点,且实际未付款订单数量少,执行效率高
  3. 可以存储不同过期时间的任务

缺点

没有重试机制,当单线程执行任务时只需在执行成功时再移除任务,但当需要多线程取任务执行时,需要额外增加1个集合,用于保存执行中的任务,执行成功则删除,执行失败则回滚到原集合

四、消息队列-延迟消息

延迟消息,指在发送消息时需指定延迟时间,消息发送成功后不会立刻消费,而是先存储在消息队列服务器,等到达指定的时间后才会被消费。

那么消息队列是怎么实现延迟功能的,消息队列轮询大批量消息不会有性能问题吗,拉Rocketmq源码看了下,大致是这么个思路:设定18个不同延迟时间的队列,每个队列的消息按顺序写入,轮询队列时只需要按顺序检查是否到达指定时间,有则发送消息并记录offset

轮询的流程图:

img

整体流程图:

img

源码分析:

1、写入CommitLog时修改topicqueueId防止消费,并记录原始topicqueueId

// org.apache.rocketmq.store.CommitLog.putMessage() 

if (msg.getDelayTimeLevel() > 0) {
// 超过maxLevel的调为maxLevel
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}

// 修改topic为SCHEDULE_TOPIC_XXXX
topic = ScheduleMessageService.SCHEDULE_TOPIC;
// 决定投递到哪个队列:queueId = delayLevel - 1
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// 保存真实的topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);
msg.setQueueId(queueId);
}

2、修改tagCode为发送时间

//  org.apache.rocketmq.store.CommitLog.checkMessageAndReturnSize() 

String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
int delayLevel = Integer.parseInt(t);
if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
}
if (delayLevel > 0) {
// 将tagsCode替换为发送时间(storeTimestamp + 延迟时间)
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);
}
}

3、每个延迟等级都新建定时任务,轮询队列

// org.apache.rocketmq.store.schedule.ScheduleMessageService 

public void start() {
// 对于每个level都建一个timerTask
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
// <delayLevel, consumeQueue.offset>
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
// 1s后执行,任务内部每100ms执行一次
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
}
// org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask.executeOnTimeup

public void executeOnTimeup() {
ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));

long failScheduleOffset = offset;

if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();

long now = System.currentTimeMillis();
// 取tagCode作为投递时间
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

long countdown = deliverTimestamp - now;

if (countdown <= 0) {
// 已经超时
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);

MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);

if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
// 当前任务ok,顺序检查下一个任务是否到期
continue;
}
} else {
// 未到超时时间
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
}
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
}
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
}

RocketMQ 4.x为了性能,在延迟时间上有所妥协,仅支持固定的延迟等级。在5.x中引入了时间轮,支持任意时间的延迟,大致思路如图:

img

借助延迟消息的方案思路如下:

  1. 创建订单成功后,发送延迟消息到mq
  2. 用户支付后删除对应消息
  3. 收到消息后进行处理
  4. 由于发送消息可能失败,需要定时轮询数据库,处理超时未付款的订单
message.setDelayTimeLevel(16) 
producer.send(message);

优点

把轮询的压力给到消息队列,业务中的逻辑变轻松,且数据库压力变小

缺点

延迟消息其实并不是一种特别好的方案,缺点如下

  1. 消息过多:使用消息队列往往是因为数据过多,数据库轮询无法满足需求,那么如果在每个订单创建时都发送延迟消息,就需要考虑消息积压的问题。可实际上超时未付款的订单只是极少数,正常来说用户是会付款或手动取消订单的。所以大部分数据都是无用消息,平白无故浪费资源,增加成本
  2. 消息队列的限制:各个项目对于延迟关闭的需求不同,而且有变化的可能,但消息队列并不是那么灵活,比如对延迟时间的限制
    1. RabbitMQ:延时最大为 2^32-1毫秒,约49天,超过的会被立刻消费
    2. RocketMQ 4.x:18个延迟级别,1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    3. RocketMQ 5.x:最大默认3天
  3. 可靠性问题:任何额外引入的中间件都要考虑其可靠性,比如消息发送失败需要额外的兜底方案,消息延迟,排查问题不方便等。在这种场景下使用消息队列,意味着更复杂的开发和维护流程,问题可能来自于消息队列的异常或开发人员不了解原理,导致使用不当

总结

项目应将可靠性、可维护性放在首位,尤其是对于小项目或刚起步的项目,数据库轮询或redis轮询完全足够了,没必要为一个超时功能,额外引入消息队列这么重的中间件,过度设计徒增成本。即使采用了redis或消息队列等方案,也最好保留数据库轮询方案,作为异常情况的兜底