百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

五分钟搞懂分布式流控算法原理和滑动窗口设计实现

csdh11 2025-03-12 13:41 23 浏览

流控的使用场景

  • 保护系统稳定性: 流控算法可以限制系统的请求流量,防止突发的大流量请求导致系统资源耗尽,从而保护系统的稳定性,避免系统崩溃或性能下降。
  • 避免资源竞争: 在高并发的情况下,如果不进行流控,多个请求可能会竞争有限的资源(如数据库连接、线程池等),导致资源竞争和资源耗尽,进而影响系统的响应时间和可用性。
  • 防止恶意攻击: 流控算法可以限制来自单个IP地址或用户的过多请求,防止恶意攻击和滥用系统资源,提高系统的安全性。
  • 提高服务质量: 通过合理的流控策略,可以确保系统能够正常处理合理范围内的请求,从而提高服务的质量和稳定性,减少服务的不可用或延迟现象。

固定窗口算法

图片来自互联网,如有侵权请联系作者删除

设计原理

维护一个单位时间内的计数值,每当一个请求通过时,就将计数值加1,当计数值超过预先设定的阈值时,就拒绝单位时间内的其他请求

问题

假设我们设定1秒内允许通过的请求阈值是99,如果有用户在时间窗口的最后几毫秒发送了99个请求,紧接着又在下一个时间窗口开始时发送了99个请求,那么这个用户其实在一秒内成功请求了198次,显然超过了阈值但并不会被限流(会有突刺问题)

滑动窗口算法

图片来自互联网,如有侵权请联系作者删除

设计原理

假设我们设定1秒内允许通过的请求是200个,但是在这里我们需要把1秒的时间分成多格,假设分成5格(格数越多,流量过渡越平滑),每格窗口的时间大小是200毫秒,每过200毫秒,就将窗口向前移动一格

问题

流量的过渡是否平滑依赖于我们设置的窗口格数也就是统计时间间隔,格数越多,统计越精确,但是具体要分多少呢?

小总结: 固定窗口 和 滑动窗口 解决:单位时间总流量

漏斗算法

图片来自互联网,如有侵权请联系作者删除


设计原理

漏斗算法以一个常量限制了出口流量速率,因此漏斗算法可以应对平滑突发的流量。其中漏斗作为流量容器我们可以看做一个FIFO的队列,当入口流量速率大于出口流量速率时,因为流量容器是有限的,当超出流量容器大小时,超出的流量会被丢弃

优点

  1. 可以平滑限制请求的处理速度,避免瞬间请求过多导致系统崩溃或者雪崩
  2. 可以控制请求的处理速度,使得系统可以适应不同的流量需求,避免过载或者过度闲置。
  3. 可以通过调整桶的大小和漏出速率来满足不同的限流需求,可以灵活地适应不同的场景。

问题

因为漏桶算法限制了流出速率是一个固定常量值,所以漏桶算法不支持出现突发流出流量。但是在实际情况下,流量往往是突发的。

令牌桶算法

图片来自互联网,如有侵权请联系作者删除

设计原理

以恒定速率往令牌桶里加入令牌,令牌桶被装满时,多余的令牌会被丢 弃。当请求到来时,会先尝试从令牌桶获取令牌(相当于从令牌桶移除一个令牌),获取成功则请求被放行,获取失败则阻塞或直接拒绝请求

算法实现

目前来说滑动窗口的实现分为两种【环形和线性】,环形的代表是Sentinel的LeapArray,线性的是EasyRetry自研的SlidingWindow,下面分别介绍这两种的设计与实现

LeapArray【Sentinel】

核心字段

javaprotected int windowLengthInMs;  // 窗口长度 (intervalInMs / sampleCount)
protected int sampleCount;  // 总窗口间隔
protected int intervalInMs; // 总窗口时间 单位毫秒
private double intervalInSecond; // 总窗口时间 单位秒

核心算法

计算起始时间【windowStart】 => 利用求余运算,保证区间内桶的开始时间是一致的

javaprotected long calculateWindowStart(long timeMillis) {
  return timeMillis - timeMillis % windowLengthInMs;
}

计算下标 【timeIdx】 => 利用除法取整和求余运算,保证区间内桶的下标位是一致的

javapublic int calculateTimeIdx(long timeMillis) {
  long timeId = timeMillis / windowLengthInMs;
  return (int) (timeId % array.length());
}

获取窗口对象

java// 根据给定的时间戳计算出在存储桶数组中的索引位置。存储桶数组是一个循环数组,存储了不同时间窗口的计数桶。
int idx = calculateTimeIdx(timeMillis);

// 根据给定的时间戳计算出对应时间窗口的开始时间。
long windowStart = calculateWindowStart(timeMillis);

while (true) {
    // 获取环形数组中对象下标的窗口对象
    WindowWrap old = array.get(idx);
    if (old == null) {
        // 不存在则创建窗口
        WindowWrap window = new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        // 通过CAS(Compare-And-Swap)原子操作来确保只有一个线程能够成功插入
        if (array.compareAndSet(idx, null, window)) {
           // 如果插入成功,返回新创建的计数桶;
            return window;
        } else {
        //  如果插入失败,线程将让出时间片,等待下一次循环。
            Thread.yield();
        }
    } else if (windowStart == old.windowStart()) {
    // 如果找到的计数桶的开始时间与给定时间戳的开始时间一致,说明该计数桶是最新的,并且当前时间戳处于这个时间窗口内。直接返回找到的计数桶。
        return old;
    } else if (windowStart > old.windowStart()) {
// 如果找到的计数桶的开始时间早于给定时间戳的开始时间,说明该计数桶已过时(不再使用)。
// 在这种情况下,算法会尝试获取一个更新锁(updateLock)来对这个过时的计数桶进行重置
//如果成功获取锁,就会调用 `resetWindowTo` 方法来重置过时的计数桶,并返回重置后的计数桶。如果获取锁失败,线程将让出时间片,等待下一次循环。
        if (updateLock.tryLock()) {
            try {
                return resetWindowTo(old, windowStart);
            } finally {
                updateLock.unlock();
            }
        } else {
            Thread.yield();
        }
    } else if (windowStart < old.windowStart()) {
    // 如果找到的计数桶的开始时间晚于给定时间戳的开始时间,这是一个不应该出现的情况,代码中并没有详细处理这种情况,只是返回一个新创建的空计数桶。
        return new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
    }
}

完整代码实现:
https://github.com/alibaba/Sentinel/blob/v1.8.0/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java

SlidingWindow[【EasyRetry】]https://www.easyretry.com/)

核心字段

java/**  
* 这是一个用于存储数据的 TreeMap,其中的键为窗口期的开始时间(LocalDateTime 类型),
* 值为一个并发链表队列(ConcurrentLinkedQueue),用于存储在该窗口期内的数据。  
*/  
public final TreeMap<LocalDateTime, ConcurrentLinkedQueue> saveData = new TreeMap<>();  
  
/**  
* 总量窗口期阈值,指定一个窗口期内数据的最大数量。
*/  
private final Integer totalThreshold;  
  
/**  
* 开启的窗口数据预警阈值,当存活的窗口数量过多时会进行预警。
*/  
private final Integer windowTotalThreshold;  
  
/**  
* 一个监听器列表,用于处理达到窗口期阈值时的操作。
*/  
private final List<Listener> listeners;

/**  
* 新增窗口锁  
*/  
private static final ReentrantLock SAVE_LOCK = new ReentrantLock();  
  
/**  
* 到达时间窗口期或者总量窗口期锁  
*/  
private static final ReentrantLock NOTICE_LOCK = new ReentrantLock();

新增数据&&开启新窗口

javapublic void add(T data) {

    LocalDateTime now = LocalDateTime.now();
    if (isOpenNewWindow(now)) {

        SAVE_LOCK.lock();
        LocalDateTime windowPeriod = now.plus(duration, chronoUnit);
        try {

            // 防止开启两个间隔时间小于窗口期的窗口
            if (isOpenNewWindow(now)) {
                ConcurrentLinkedQueue list = new ConcurrentLinkedQueue<>();
                list.add(data);

                LogUtils
                    .info(log, "添加新数据 [{}] [{}] size:[{}]", windowPeriod, Thread.currentThread().getName(), list.size());
                saveData.put(windowPeriod, list);

                // 扫描n-1个窗口,是否过期,过期则删除
                removeInvalidWindow();

                // 超过窗口阈值预警
                alarmWindowTotal();

            } else {
                oldWindowAdd(data);
            }

        } finally {
            SAVE_LOCK.unlock();
        }

    } else {
        oldWindowAdd(data);
    }

}

往已存在的窗口期内添加数据

java    private void oldWindowAdd(T data) {

        LocalDateTime windowPeriod = getNewWindowPeriod();
        // 添加数据
        ConcurrentLinkedQueue list = saveData.get(windowPeriod);
        list.add(data);

        // 到达总量窗口期,将数据传递给监听器进行处理。
        if (list.size() >= totalThreshold) {
            doHandlerListener(windowPeriod);
        }

    }

处理到达窗口期的数据监听器

javaprivate void doHandlerListener(LocalDateTime windowPeriod) {

    NOTICE_LOCK.lock();

    try {

        ConcurrentLinkedQueue list = saveData.get(windowPeriod);
        if (CollectionUtils.isEmpty(list)) {
            return;
        }

        // 深拷贝
        ConcurrentLinkedQueue deepCopy = new ConcurrentLinkedQueue<>(list);
        clear(windowPeriod, deepCopy);

        if (CollectionUtils.isEmpty(deepCopy)) {
            return;
        }

        for (Listener listener : listeners) {
            // 用户自定义实现具体的数据处理逻辑
            listener.handler(new ArrayList<>(deepCopy));
        }

    } catch (Exception e) {
        log.error("到达总量窗口期通知异常", e);
    } finally {
        NOTICE_LOCK.unlock();
    }

}

删除过期窗口

扫描n-1个窗口,是否过期,过期则删除 过期条件为窗口期内无数据

java    private void removeInvalidWindow() {

        for (int i = 0; i < saveData.size() - 1; i++) {
            Map.Entry<LocalDateTime, ConcurrentLinkedQueue> firstEntry = saveData.firstEntry();
            if (CollectionUtils.isEmpty(firstEntry.getValue())) {
                saveData.remove(firstEntry.getKey());
            }
        }
    }

完成代码实现:
https://gitee.com/aizuda/easy-retry/blob/master/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/window/SlidingWindow.java

项目实战

本篇文章多次提到EasyRetry,想必小伙伴们都很好奇,下面简单介绍一下EasyRetry

EasyRetry是致力提高分布式业务系统一致性的分布式重试平台,它提供了控制台任务观测、可配置的重试策略、重试后执行回调以及丰富地告警配置等功能。通过这些手段,可以对异常数据进行全面监测和回放,从而在确保系统高可用性的同时,大大提升数据的一致性。详细了解

EasyRetry为什么使用滑动窗口?

场景一 客户端批量上报异常数据

EasyRetry作为高性能的分布式重试平台,从设计之初就充分考虑了重试数据上报的集中性和异步特点.使用滑动窗口批量上报异常数据可以减少网络传输的频率,从而降低网络开销。滑动窗口可以将一定时间内的异常数据进行缓冲和合并,然后一次性发送,减少了频繁的网络通信。

具体使用案例

java// 滑动窗口的参数配置
SlidingWindowConfig slidingWindowConfig = easyRetryProperties.getSlidingWindow();  
  
slidingWindow = SlidingWindow  
    .Builder  
    .newBuilder()  
    .withTotalThreshold(slidingWindowConfig.getTotalThreshold())  
    .withWindowTotalThreshold(slidingWindowConfig.getWindowTotalThreshold())  
    .withDuration(slidingWindowConfig.getDuration(), slidingWindowConfig.getChronoUnit())  
    .withListener(new ReportListener())  
.build();  
  
slidingWindow.start();

场景二 重试流量的统计(计划在后期版本实现)

通过LeapArray对重试流量进行统计,EasyRetry将进一步实现重试风暴的管控,从而确保服务的稳定性并提升服务端的质量。

有兴趣的小伙伴可以关注EasyRetry

相关推荐

NUS邵林团队发布DexSinGrasp基于强化学习实现物体分离与抓取统一

本文的作者均来自新加坡国立大学LinSLab。本文的共同第一作者为新加坡国立大学实习生许立昕和博士生刘子轩,主要研究方向为机器人学习和灵巧操纵,其余作者分别为硕士生桂哲玮、实习生郭京翔、江泽宇以及...

「PLC进阶」如何通过编写SCL语言程序实现物料分拣?

01、前言SCL作为IEC61131-3编程语言的一种,由于其高级语言的特性,特别适合复杂运算、复杂数学函数应用的场合。本文以FactoryIO软件中的物料分拣案例作为硬件基础,介绍如何通过SCL来实...

zk源码—5.请求的处理过程一(http1.1请求方法)

大纲1.服务器的请求处理链...

自己动手从0开始实现一个分布式 RPC 框架

前言为什么要自己写一个RPC框架,我觉得从个人成长上说,如果一个程序员能清楚的了解RPC框架所具备的要素,掌握RPC框架中涉及的服务注册发现、负载均衡、序列化协议、RPC通信协议、Socket通信、异...

MLSys’25 | 极低内存消耗:用SGD的内存成本实现AdamW的优化性能

AIxiv专栏是机器之心发布学术、技术内容的栏目。过去数年,机器之心AIxiv专栏接收报道了2000多篇内容,覆盖全球各大高校与企业的顶级实验室,有效促进了学术交流与传播。如果您有优秀的工作想要分享,...

线程池误用导致系统假死(线程池会自动销毁吗)

背景介绍在项目中,为了提高系统性能使用了RxJava实现异步方案,其中异步线程池是自建的。但是当QPS稍微增大之后却发现系统假死、无响应和返回,调用方出现大量超时现象。但是通过监控发现,系统线程数正常...

大型乘用车工厂布局规划(六大乘用车基地)

乘用车工厂的布局规划直接影响生产效率、物流成本、安全性和未来扩展能力。合理的布局应确保生产流程顺畅、物流高效、资源优化,并符合现代化智能制造和绿色工厂的要求。以下是详细的工厂布局规划要点:1.工厂布...

西门子 S7-200 SMART PLC 连接Factory IO的方法

有很多同学不清楚如何西门子200smart如何连接FactoryIO,本教程为您提供了如何使用西门子S7-200SMARTPLC连接FactoryIO的说明。设置PC和PLC之间的...

西门子博图高级仿真软件的应用(西门子博途软件仿真)

1.博图高级仿真软件(S7-PLCSIMAdvancedV2.0)S7-PLCSIMAdvancedV2.0包含大量仿真功能,通过创建虚拟控制器对S7-1500和ET200SP控制器进行仿真...

PLC编程必踩的6大坑——请对号入座,评论区见

一、缺乏整体规划:面条式代码问题实例:某快递分拣线项目初期未做流程图设计,工程师直接开始编写传送带控制程序。后期增加质检模块时发现I/O地址冲突,电机启停逻辑与传感器信号出现3处死循环,导致项目延期2...

统信UOS无需开发者模式安装软件包
统信UOS无需开发者模式安装软件包

原文链接:统信UOS无需开发者模式安装软件包...

2025-05-05 14:55 csdh11

100个Java工具类之76:数据指纹DigestUtils

为了提高数据安全性,保证数据的完整性和真实性,DigestUtils应运而生。正确恰当地使用DigestUtils的加密算法,可以实现数据的脱敏,防止数据泄露或篡改。...

麒麟KYLINIOS软件仓库搭建02-软件仓库添加新的软件包

#秋日生活打卡季#原文链接:...

Java常用工具类技术文档(java中工具类的作用)

一、概述Java工具类(UtilityClasses)是封装了通用功能的静态方法集合,能够简化代码、提高开发效率。本文整理Java原生及常用第三方库(如ApacheCommons、GoogleG...

软路由的用法(自动追剧配置)(软路由教学)

本内容来源于@什么值得买APP,观点仅代表作者本人|作者:值友98958248861环境和需求...