• 主页
  • 架构
  • 编程语言
  • 数据存储
  • 网络
  • VMware
  • 服务器
  • 组网
  • AI
  • 算法系列
  • 设计模式
  • 读书笔记
  • 思考
  • 工具
  • 其它技术

  • 主页
  • 架构
  • 编程语言
  • 数据存储
  • 网络
  • VMware
  • 服务器
  • 组网
  • AI
  • 算法系列
  • 设计模式
  • 读书笔记
  • 思考
  • 工具
  • 其它技术

限流实现2

2024-08-18

简介

上一篇文章 限流实现1 已经介绍三种限流方案

  • 随机拒流
  • 计数器方式
  • 基于滑动时间窗口限流

剩下的几种本来打算能立即写完,没想到一下三个月过去了,很是尴尬。本次主要实现如下两种算法

  • 令牌桶算法
  • 漏斗算法

算法的具体实现可以在github上查看 https://github.com/shidawuhen/asap/tree/master/controller/limit

下面先来介绍一下这两种算法

令牌桶算法

算法说明

令牌桶算法(Token Bucket):是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。令牌桶算法示意图如下所示:

一般的流程为:

a. 按特定的速率向令牌桶投放令牌

b. 根据预设的匹配规则先对报文进行分类,不符合匹配规则的报文不需要经过令牌桶的处理,直接发送;

c. 符合匹配规则的报文,则需要令牌桶进行处理。当桶中有足够的令牌则报文可以被继续发送下去,同时令牌桶中的令牌量按报文的长度做相应的减少;

d. 当令牌桶中的令牌不足时,报文将不能被发送,只有等到桶中生成了新的令牌,报文才可以发送。这就可以限制报文的流量只能是小于等于令牌生成的速度,达到限制流量的目的。

大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。

令牌桶能允许突发,是因为令牌桶一般有两个值,一个是桶容量,一个是单位时间投放令牌量。如果桶容量大于令牌单位时间投放量,而且单位时间消耗量比投放量少,则令牌数量最终会达到桶容量最大值。此时如果大量请求到达,会将所有令牌消耗,实现了允许突发的效果。

算法实现

该算法有几个核心点

  1. 因为要更新令牌数量,所以需要加锁
  2. 定时向桶中放入令牌,有两种方式,一种是起goroutine,定时投放,另一种是在判断是否还有足够令牌的时候,根据投放情况进行投放。这次实现使用的是第二种方法,整个架构会更简单一些。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package limit

import (
"github.com/gin-gonic/gin"
"net/http"
"sync"
"time"
)

// @Tags limit
// @Summary 令牌桶拒流
// @Produce json
// @Success 200 {string} string "成功会返回ok"
// @Failure 502 "失败返回reject"
// @Router /limit/tokenreject [get]
type TokenBucket struct {
rate int64 //固定的token放入速率, r/s
capacity int64 //桶的容量
tokens int64 //桶中当前token数量
lastTokenSec int64 //桶上次放token的时间戳 s

lock sync.Mutex
}

func (l *TokenBucket) Allow() bool {
l.lock.Lock()
defer l.lock.Unlock()

now := time.Now().Unix()
l.tokens = l.tokens + (now-l.lastTokenSec)*l.rate // 先添加令牌
if l.tokens > l.capacity {
l.tokens = l.capacity
}
l.lastTokenSec = now
if l.tokens > 0 {
// 还有令牌,领取令牌
l.tokens--
return true
} else {
// 没有令牌,则拒绝
return false
}
}

func (l *TokenBucket) Set(r, c int64) {
l.rate = r
l.capacity = c
l.tokens = 0
l.lastTokenSec = time.Now().Unix()
}

func CreateTokenBucket()*TokenBucket{
t := &TokenBucket{}
t.Set(1,5)
return t
}

var tokenBucket *TokenBucket = CreateTokenBucket()

func TokenReject(c *gin.Context) {
//fmt.Println(tokenBucket.tokens)
if !tokenBucket.Allow() {
c.String(http.StatusBadGateway, "reject")
return
}
c.String(http.StatusOK, "ok")
}

漏桶算法

算法说明

漏桶作为计量工具(The Leaky Bucket Algorithm as a Meter)时,可以用于流量整形(Traffic Shaping)和流量控制(TrafficPolicing),漏桶算法的描述如下:

  • 一个固定容量的漏桶,按照常量固定速率流出水滴;
  • 如果桶是空的,则不需流出水滴;
  • 可以以任意速率流入水滴到漏桶;
  • 如果流入水滴超出了桶的容量,则流入的水滴溢出了(被丢弃),而漏桶容量是不变的。

漏桶法限流很好理解,假设我们有一个水桶按固定的速率向下方滴落一滴水,无论有多少请求,请求的速率有多大,都按照固定的速率流出,对应到系统中就是按照固定的速率处理请求。

示意图如下:

算法实现

查阅了一下相关资料,主要的算法实现有三种。学习完这几种实现后,我怀疑我是不是理解错了,感觉还没有计数拒流方便好用。如果我理解的有误,大家也可以告诉我。

这三种方式为:

令牌桶算法变种

桶的大小就是单位时间内能流出的最大量,这种就不写了。

计数拒流变种

该方法在指定时间将可用空间置为初始值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package limit

import (
"fmt"
"github.com/gin-gonic/gin"
"net/http"
"sync"
"time"
)

type LeakyBucket struct {
// 容量
capacity int64
// 剩余大小
remaining int64
// 下一次的重置容量时间
reset time.Time
// 重置容量时间间隔
rate time.Duration
mutex sync.Mutex
}
func (b *LeakyBucket) Allow() bool {
b.mutex.Lock()
defer b.mutex.Unlock()
if time.Now().After(b.reset) { // 需要重置
b.reset = time.Now().Add(b.rate) // 更新时间
b.remaining = b.capacity // 重置剩余容量
}
fmt.Println(b.remaining)
if b.remaining > 0 { // 判断是否能过
b.remaining--
return true
}
return false
}

func (b *LeakyBucket) Set(r time.Duration, c int64) {
b.rate = r
b.capacity = c
b.remaining = c
b.reset = time.Now().Add(b.rate)
}

func CreateLeakyBucket(r time.Duration,c int64) *LeakyBucket {
t := &LeakyBucket{}
t.Set(r, c)
return t
}

var leakyBucket *LeakyBucket = CreateLeakyBucket(time.Second*2,10)
func LeakyReject(c *gin.Context) {
if !leakyBucket.Allow() {
c.String(http.StatusBadGateway, "reject")
return
}
c.String(http.StatusOK, "ok")
}

真固定速率

这个算法的的实现,根据uber团队开源的 github.com/uber-go/ratelimit 而来。

该实现保证如果有大量请求,每一个请求会按照规定的时间间隔执行。如设置1s处理100个请求,则每10ms会处理一个。

如果长时间没有请求,仍会产生请求在短时间内被处理完毕的情况。当然对于这种情况可以很容易修复,大家可以思考一下如何修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package limit

import (
"fmt"
"github.com/andres-erbsen/clock"
"github.com/gin-gonic/gin"
"net/http"
"sync"
"time"
)

//真固定速率
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}

type limiter struct {
sync.Mutex // 锁
last time.Time // 上一次的时刻
sleepFor time.Duration // 需要等待的时间
perRequest time.Duration // 每次的时间间隔
maxSlack time.Duration // 最大的富余量
clock Clock // 时钟
}

// Take 会阻塞确保两次请求之间的时间走完
// Take 调用平均数为 time.Second/rate.
func (t *limiter) Take() time.Time {
t.Lock()
defer t.Unlock()

now := t.clock.Now()

// 如果是第一次请求就直接放行
if t.last.IsZero() {
t.last = now
return t.last
}

// sleepFor 根据 perRequest 和上一次请求的时刻计算应该sleep的时间
// 由于每次请求间隔的时间可能会超过perRequest, 所以这个数字可能为负数,并在多个请求之间累加
t.sleepFor += t.perRequest - now.Sub(t.last)
fmt.Println(t.sleepFor)
// 我们不应该让sleepFor负的太多,因为这意味着一个服务在短时间内慢了很多随后会得到更高的RPS。
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}

// 如果 sleepFor 是正值那么就 sleep
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}
return t.last
}

func NewLimiter(rate int) *limiter {
l := &limiter{
perRequest: time.Second / time.Duration(rate),
maxSlack: -10 * time.Second / time.Duration(rate),
}

if l.clock == nil {
l.clock = clock.New()
}
return l
}

var rl = NewLimiter(100) // per second,每秒100个请求
func LeakyRejectFixedRate(c *gin.Context) {
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take()
fmt.Println(i, now.Sub(prev))
prev = now
}
c.String(http.StatusOK, "ok")
}

演示结果如下:

总结

学习了令牌桶和漏桶算法,结合这几年工作中的具体场景,感觉令牌桶算法的实用价值更大一些。

下面是令牌桶和漏桶对比:

  • 令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;
  • 漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
  • 令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌),并允许一定程度突发流量;
  • 漏桶限制的是常量流出速率(即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2),从而平滑突发流入速率;
  • 令牌桶允许一定程度的突发,而漏桶主要目的是平滑流入速率;
  • 两个算法实现可以一样,但是方向是相反的,对于相同的参数得到的限流效果是一样的。

最后,给大家展示一下各种限流算法的比较

资料

  1. 限频方案比较

  2. 高并发系统限流-漏桶算法和令牌桶算法

  3. 令牌桶与漏桶算法

  4. 漏桶、令牌桶限流的Go语言实现

扫一扫,分享到微信

微信分享二维码
HTTPS连接过程
一个与运算引发的事故
© 2025 John Doe
Hexo Theme Yilia by Litten