给定一个公共API,限制每个用户每秒只能调用1000次,如何实现?这个一个经典的API限速问题(API rate limiting)。
目录
初始想法
最朴素的想法就是给每一个用户配额,此时为Q,在第一次调用的时候分配给用户,然后在接下来的时间段,如果访问的此时大于Q,就直接拒绝用户的调用。随后经过一段时间后,再次分配Q。
type tf struct {
limits int
lastAsk time.Time
}
var GAP time.Duration = time.Second * 10 // 单次访问限制在10s
var Q int = 5 // 10s内限制访问5次
func limitAPI(user string, config *map[string]tf) bool {
if tmp, ok := (*config)[user]; ok {
// user在之前被分配过
if time.Now().Sub(tmp.lastAsk) < GAP {
if tmp.limits < Q {
tmp.limits += 1
return true
} else {
return false
}
} else {
tmp.limits=1 // 重置限制册数
return true // 超出次数限制
}
}else {
(*config)[user]=tf{1, time.Now()} // 添加用户
return true
}
}
这种实现方法十分朴素,可以使用Redis中设置过期时间的key来实现,但是存在一个BUG,用户可以在一个时间段的末尾发起Q次请求,然后在下一个时间段的开始又 发起Q次请求,这样,一个用户可以在很短的时间之内发起2Q次请求。这样一来就达不到限速的需求了。
Token Bucket
算法的三大要点:
- 所有的流量在放行之前需要获取一定量的 token;
- 所有的 token 存放在一个 bucket(桶)当中,每 1/r 秒,都会往这个 bucket 当中加入一个 token;
- bucket 有最大容量(capacity or limit),在 bucket 中的 token 数量等于最大容量,而且没有 token 消耗时,新的额外的 token 会被抛弃。
此外应当为每一个需求设置独立的bucket,例如为每一个用户限制访问,为每个IP限制访问,为全局设置访问,那么需要三个bucket。
简陋版本
为每一个bucket设置一个线程,完成上述的三大步骤(每次间隔 \frac{1}{GAP} 秒定时添加token, 有访问需求减一个token)。但实际上如果有大访问量来到的时候,服务器无法承受这么多的线程。
改进
我们可以在 Bucket 中存放现在的 Token 数量,然后存储上一次补充 Token 的时间戳,当用户下一次请求获取一个 Token 的时候, 根据此时的时间戳,计算从上一个时间戳开始,到现在的这个时间点所补充的所有 Token 数量,加入到 Bucket 当中。
这样的做法可以避免定时添加token,具体做法如下:
func resync(user string, config *map[string]tf) {
perSecond := float64(0.5) // 每2秒钟加入1个token
if tmp, ok := (*config)[user]; ok {
tmp.limits += int(float64(time.Now().Unix()-tmp.lastAsk.Unix()) / perSecond)
} else {
(*config)[user] = tf{10 - 1, time.Now()}
}
}
使用这种做法可以避免给每个定时器设置一个线程的缺点,同时只需要储存上一个访问的时间戳以及剩余的token的量,所需的存储空间较少。且只有用户访问的时候才进行更新,对系统资源要求较少。
使用这种方法的第一个首要问题就是Bucket放置何处?虽然Bucket占用内存很小,假设一个 Bucket 的大小为20 个字节,如果需要储存一百万个 Bucket 就需要使用 20M 的内存。而且,Bucket 从 一定意义上属于缓存数据,因为如果用户长期不使用这个 Bucket 的话,应该能够自动失效。从上面的分析,可以将 Bucket 放在 Redis 当中,每个 Bucket 使用一个Hash存放(用HSET命令进行操作,如果存在直接覆盖,不存在直接新建), 并且支持在一段时间之后,使 Bucket 失效(TTL)。
func access(user, redis_ip, password string) bool {
// 使用redis对用户访问进行控制
var GAP int64 = int64(time.Millisecond) * 1000
client := redis.NewClient(&redis.Options{
Addr: redis_ip + ":6379",
Password: password, // no password set
DB: 0, // use default DB
})
_, err := client.Get(user).Result()
if err != redis.Nil {
// 用户不存在
client.HSet(user, "limits", 10-1) // 默认每秒钟10次
client.HSet(user, "lastAck", time.Now().UnixMilli())
return true
} else {
lastAck, err := client.HGet(user, "lastAck").Int64()
if err != nil {
return false
}
interval := time.Now().UnixMilli() - lastAck
tokenRemaining, _ := client.HGet(user, "limits").Int()
if interval > GAP {
tokenRemaining = 10 // 距离上次访问超过GAP,那么token直接加满
} else {
tokenRemaining = int(math.Min(float64(int(10*interval/int64(1000))+tokenRemaining), 10)) // 最后确保不会超过10
}
client.HSet(user, "limits", time.Now().UnixMilli())
if tokenRemaining == 0 {
client.HSet(user, "limits", tokenRemaining)
return false
}else{
client.HSet(user, "limits", tokenRemaining-1)
return true
}
}
}
上述简单的实现对于多线程的性能支持很好,但是对于多线程的情况下存在争用的情况,因此需要使用redis的分布式事务/锁实现。