百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

如何实现延迟队列(JDK/mysql/redis/Rabbit)

csdh11 2025-03-12 13:40 19 浏览

何为延迟队列

队列,即先进先出的数据结构,就和食堂打饭一样,排在最前面的先打饭,打完饭就走;延迟队列即队列中的元素相比以往多了一个属性特征:延迟。延迟队列中的每个元素都指定了延迟时间,表示该元素到达指定时间之后将出队进行处理。其实从上述定义来看,与其说是延迟队列,不如说它是一个以时间为权重最小堆结构

那么延迟队列有什么用呢?我们生活中其实平时接触到很多可以使用延迟队列来解决的例子:

  • 订单超时30分钟未付款将自动关闭
  • 会议系统中,会议开始前10分钟,发送会议提醒
  • 夏天晚上时,我们经常会给空调设置指定时长的时间,到时空调自动关闭
  • 再比如微波炉、烤箱、等等

可以发现延迟队列想要实现的功能其实就是一个定时任务调度的一种。

延迟队列实现方式

延迟队列实现的方式有很多种,具体采用哪种去实现,和我们的业务背景、业务诉求都息息相关,不同的实现方式都有其适用的应用场景,我这里将延迟队列分为两类:单机延迟队列分布式延迟队列

1. 单机实现

JDK 提供了DelayedQueue可以实现延迟队列的目的。其类图如下:

可以看到DelayedQueue是一个阻塞队列,其队列中的元素必须实现Delayed接口:

public interface Delayed extends Comparable {
    long getDelay(TimeUnit unit);
}

其中getDelay返回代表该元素的一个在队列中可存在的时间,通过这种方式来实现元素的延迟弹出。接下来看订单超时30秒将自动关闭的实际例子:

public class JDKDelayQueueTest {
    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    private static final DelayQueue DELAY_QUEUE = new DelayQueue<>();

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws Exception {

        EXECUTOR_SERVICE.submit(() -> {
            while (true) {
                if (!DELAY_QUEUE.isEmpty()) {
                    Order order = DELAY_QUEUE.poll();
                    if (order != null) {
                        System.out.println(order.getOrderId() + " 超时关闭与:" + FORMATTER.format(LocalDateTime.now()));
                    }

                }
                TimeUnit.MILLISECONDS.sleep(1000);
            }
        });
        EXECUTOR_SERVICE.submit(() -> {
            try {
                DELAY_QUEUE.add(new Order("黄焖鸡订单"));
                TimeUnit.SECONDS.sleep(5);
                DELAY_QUEUE.add(new Order("麻辣香锅订单"));
                TimeUnit.SECONDS.sleep(10);
                DELAY_QUEUE.add(new Order("石锅拌饭订单"));
            } catch (Exception e) {

            }

        });

    }

    public static class Order implements Delayed {

        private final LocalDateTime expireTime;
        private final String orderId;

        public Order(String orderId) {
            this.expireTime = LocalDateTime.now().plusSeconds(30);
            this.orderId = orderId;
            System.out.println(orderId + " 创建于:" + FORMATTER.format(LocalDateTime.now()));
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return LocalDateTime.now().isAfter(expireTime) ? -1 : 1;
        }

        @Override
        public int compareTo(Delayed targetOrder) {
            // 谁的过期时间最早谁就排最前面
            return this.expireTime.isBefore(((Order) targetOrder).getExpireTime()) ? -1 : 1;
        }

        public String getOrderId() {
            return orderId;
        }

        public LocalDateTime getExpireTime() {
            return expireTime;
        }
    }
}

输出:

黄焖鸡订单 创建于:2021-08-21 18:26:30
麻辣香锅订单 创建于:2021-08-21 18:26:35
石锅拌饭订单 创建于:2021-08-21 18:26:45
黄焖鸡订单 超时关闭与:2021-08-21 18:27:00
麻辣香锅订单 超时关闭与:2021-08-21 18:27:05
石锅拌饭订单 超时关闭与:2021-08-21 18:27:15

DelayQueue实现方式小结

这种方式的优点就是实现简单,不复杂,但是其缺点也比较多:不具备可扩展性内存限制无持久化机制,数据容易丢失。


分布式实现

2. 数据库轮询

数据库论询的方式相对而言也比较好理解,后台启动定时任务每隔一段时间扫描指定的数据库表每一行数据,获取出到达指定延迟时间的行进行处理,所以使用该方式重要的就三个要素:

1)捞取任务
扫描数据库的后台任务,可以使用分布式任务去扫,比如A任务扫描limit 0,100满足条件的数据行,B任务扫描limit 100,200满足条件的数据行

2)执行任务
一般来说讲究分工协作,第一步中的分布式线程任务专门用来捞取任务,那么捞取到的任务可以再次扔给另外专门用户处理任务的线程中

3)数据库表设计
可以在表中增加一个字段来表示延迟时间,比如针对上面的订单超时30秒关闭,我们可以增加一个字段timeout,可以是此时间的毫秒数来记录订单的超时时间,那么此时我们的SQL就可以是:

select * from order where ${now} >= timeout limit ${start},100;

数据库轮询实现方式小结

采用这种方式可以看到首先我们需要查询数据库,那么查询数据库就意味着存在查询耗时,那么可能最终导致的就是实时性不高,但是它的优点在于天生满足任务持久化机制,不用担心延迟任务丢失。

3.通过Redis实现

Redis的五大数据类型中的zset数据类型中,包含一个称为score的属性,该数据类型中所有元素都会按照score进行排序,所以如果将score作为我们的延迟时间的时间戳,那么我们可以通过命令Zrangebyscore来获取满足条件的数据,然后交给我们的任务处理线程去处理,其实整个实现思想和数据库轮循是一样的,只不过数据存储结构由数据库转变成了redis,准确来说redis也是数据库,只不过不同的存储结构带来的影响就是适用场景的不同罢了。

那么如果通过Redis来实现延迟队列,大概会有如下几步:

1) 增加任务

zadd tasks ${过期时间戳} ${任务相关数据}

2)捞取任务

ZRANGEBYSCORE tasks -inf ${当前时间戳} WITHSCORES

捞取过期时间早于当前时间的这部分任务

3)执行任务 接下来就是执行,这个就没什么好说的了

关于redis zset数据结构以及命令可以看这里:
https://www.runoob.com/redis/redis-sorted-sets.html

一些优化点

1.在添加延迟任务时,可以通过对任务id进行hash分散至多个redis key,可以避免所有任务存储在一个key中导致大key从而影响元素的添加和查找性能

2.每个key独自拥有一个线程处理

3.每个key的线程只负责拉取需要处理的数据,然后再转发至消息队列中,不做任何其他处理,可以提升处理速度,消息消费者可扩展性好,性能不够,机器来凑

redis实现方式小结

redis因为其都是内存中操作,所以查询插入速度和mysql来比都是非常快的,所以实时性会比mysql高,虽然redis也能满足任务数据的持久化,但是无法保证任务不丢失,所以这里持久性会比mysql稍弱一点


4.使用消息队列

我们可以采用rabbitMQ的延时队列。RabbitMQ具有以下两个特性,可以实现延迟队列

  • RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
  • lRabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了deadletter,则按照这两个参数重新路由。结合以上两个特性,就可以模拟出延迟消息的功能.

Time-To-Live Extensions

RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅官方文档。

Dead Letter Exchange

刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:

  1. 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
  2. 消息因为设置了TTL而过期。
  3. 消息进入了一条已经达到最大长度的队列。

如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。

不同实现方式的对比

实现方式

复杂度

数据量

持久化,数据丢失

扩展性

实时性

jdk DelayQueue

简单

由于程序内存限制,适用于少数据量

无持久化

mysql 轮询

稍微复杂

可支持大数据量

可保证持久化,保证任务不丢失

可扩展

由于查询开销,稍弱

redis zet

稍微复杂

可支持大数据量

可尽量保证持久化,不保证任务不丢失

可扩展

RabbitMQ

稍微复杂

可支持大数据量

可保证持久化,保证任务不丢失

可扩展

结语

除了以上实现方式,还有其他比如通过Rabbit MQTTL死信队列来实现:每一个消息带有TTL属性,该TTL即延迟任务的延迟时间,只要超过指定时间没被消费,此消息将被转至死信队列中,我们可以监听死信队列消费消息进而达到延迟任务的目的;还有时间轮转算法等,时间有限,日后再学,日后再讲。

相关推荐

教学楼里那种嵌着小石子的水磨石地面,是怎么整出来的? | 有趣的制造

今天的选题是之前小可爱「花凉」在后台发消息问的~看过以后念念不忘,满脑子都是小时候在教学楼冰冷地面上摔的跤,记不起来是不是在这种地面上磕掉的门牙...昨天发了预告后,有小可爱纷纷表示「就是这种地板,像...

教学楼里那种嵌着小石子的水磨石地面,是怎么整出来的?

话说有多少小可爱不想学习时,没事数着水磨石地面的小石子玩,然后互相评比哪颗石子最好看。到头来书又没有背完,课也没好好上,就怪地板有迷幻效果,扰乱了好好学习的坚定意志。(小编觉得即使换成瓷砖,你们也可能...

性能调优实战:Spring Boot 多线程处理SQL IN语句大量值的优化方案

环境:SpringBoot3.4.0...

RMAN备份监控及优化总结(rman全备份)

今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...

记Oracle中快速获取表及其各个字段注释的方法

简述java开发中,用过JPA的道友应该知道,我们可以通过写java代码自动生成对应的数据表;但这有个问题是,列名的注释并没有帮我们一起添加到数据库去,尤其在一些开发测试生产三个环境隔离的,就很不友好...

Oracle 数据库日常巡检之检查数据库cpu、I/O、内存性能

记录数据库的cpu使用、IO、内存等使用情况,使用vmstat,iostat,sar,top等命令进行信息收集并检查这些信息,判断资源使用情况。1.CPU使用情况:...

Oracle案例:ORA-00600: internal error code, arguments: 「4187」

本案例客户来自某省电信,alert日志大量的ORA-00600[4187]报错,已经影响到业务正常运行。...

MySQL索引失效的10大陷阱:从隐式类型转换到索引选择性全面优化

索引是MySQL性能优化的核心武器,但错误的使用场景可能让索引完全失效,导致查询性能断崖式下降。本文通过实际案例,深入剖析索引失效的典型场景及其底层原理,并提供可落地的解决方案。一、索引失效的核心原...

oracle查询语句执行计划分析(oracle如何查看sql执行计划)

1命令行开启配置#显示查询结果setautotraceon#不显示查询结果setautotracetraceonly2执行查询语句...

面试官:说说Oracle数据库result cache的原理是什么?

概述前面已经用实验给大家介绍了ResultCache相关内容,今天主要讨论一下Oracle11gResultCache的深层原理。从参数看,Oracle提供了ClientResultCac...

Oracle817 export 时ORA-06553和ORA-00904处理

现象:数据库版本8.1.7...

Oracle案例:一次gc buffer busy acquire诊断

本案例来自某客户两节点rac的一次生产故障,现象是大面积的gcbufferbusyacquire导致业务瘫痪。...

说文解字:“雪”字本身在造字时就很浪漫!

这是雪山的“雪”字。可是你知道吗?“雪”这个字其实和“山”是没有任何关系的。这个字下半部分“彐”并不是一座翻倒的山,而是一只手的意思。(凡是带“彐”的汉字,其实都和手有关。)“雪”字的商代甲骨文形状,...

应用最广的两类数据库的区别、优势对比、查询优化方法及案例实践

 1、通用数据库分类  1.1关系型数据库  关系型数据库是多个二维数据表的集合,数据以二维数据表的形式进行存储,数据表之间可以通过应用程序或者数据的主、外键建立特定的关联关系,让数据之间存在特定的...

【SQL】SQL 语法差异大全(PgSQL/MySQL/Oracle/TiDB/OceanBase)

以下是针对不同数据库系统的SQL语法差异总结,按功能分类展示:一、基础查询1.分页查询...