一、场景描述

很多做服务接口的人或多或少的遇到这样的场景,由于业务应用系统的负载能力有限,为了防止非预期的请求对系统压力过大而拖垮业务应用系统。

也就是面对大流量时,如何进行流量控制?

服务接口的流量控制策略:分流、降级、限流等。本文讨论下限流策略,虽然降低了服务接口的访问频率和并发量,却换取服务接口和业务应用系统的高可用。

实际场景中常用的限流策略:

  • Nginx前端限流

按照一定的规则如帐号、IP、系统调用逻辑等在Nginx层面做限流

  • 业务应用系统限流

1、客户端限流

2、服务端限流

  • 数据库限流

红线区,力保数据库

二、常用的限流算法

常用的限流算法由:楼桶算法和令牌桶算法。本文不具体的详细说明两种算法的原理,原理会在接下来的文章中做说明。

1、漏桶算法

漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率.示意图如下:

   

可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(burst),另一个是水桶漏洞的大小(rate)。

因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使流突发(burst)到端口速率.因此,漏桶算法对于存在突发特性的流量来说缺乏效率.

2、令牌桶算法

令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解.随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了.新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务.

  令牌桶的另外一个好处是可以方便的改变速度. 一旦需要提高速率,则按需提高放入桶中的令牌的速率. 一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量.

三、基于Redis功能的实现

简陋的设计思路:假设一个用户(用IP判断)每分钟访问某一个服务接口的次数不能超过10次,那么我们可以在Redis中创建一个键,并此时我们就设置键的过期时间为60秒,每一个用户对此服务接口的访问就把键值加1,在60秒内当键值增加到10的时候,就禁止访问服务接口。在某种场景中添加访问时间间隔还是很有必要的。

1)使用Redis的incr命令,将计数器作为Lua脚本

 local current
current = redis.call("incr",KEYS[])
if tonumber(current) == then
redis.call("expire",KEYS[],)
end

Lua脚本在Redis中运行,保证了incr和expire两个操作的原子性。

2)使用Reids的列表结构代替incr命令

 FUNCTION LIMIT_API_CALL(ip)
current = LLEN(ip)
IF current > THEN
ERROR "too many requests per second"
ELSE
IF EXISTS(ip) == FALSE
MULTI
RPUSH(ip,ip)
EXPIRE(ip,)
EXEC
ELSE
RPUSHX(ip,ip)
END
PERFORM_API_CALL()
END

Rate Limit使用Redis的列表作为容器,LLEN用于对访问次数的检查,一个事物中包含了RPUSH和EXPIRE两个命令,用于在第一次执行计数是创建列表并设置过期时间,

RPUSHX在后续的计数操作中进行增加操作。

四、基于令牌桶算法的实现

令牌桶算法可以很好的支撑突然额流量的变化即满令牌桶数的峰值。

 import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.Preconditions;
import com.netease.datastream.util.framework.LifeCycle; public class TokenBucket implements LifeCycle { // 默认桶大小个数 即最大瞬间流量是64M
private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64; // 一个桶的单位是1字节
private int everyTokenSize = 1; // 瞬间最大流量
private int maxFlowRate; // 平均流量
private int avgFlowRate; // 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE); private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private volatile boolean isStart = false; private ReentrantLock lock = new ReentrantLock(true); private static final byte A_CHAR = 'a'; public TokenBucket() {
} public TokenBucket(int maxFlowRate, int avgFlowRate) {
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
} public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
this.everyTokenSize = everyTokenSize;
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
} public void addTokens(Integer tokenNum) { // 若是桶已经满了,就不再家如新的令牌
for (int i = 0; i < tokenNum; i++) {
tokenQueue.offer(Byte.valueOf(A_CHAR));
}
} public TokenBucket build() { start();
return this;
} /**
* 获取足够的令牌个数
*
* @return
*/
public boolean getTokens(byte[] dataSize) { Preconditions.checkNotNull(dataSize);
Preconditions.checkArgument(isStart, "please invoke start method first !"); int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数 final ReentrantLock lock = this.lock;
lock.lock();
try {
boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量
if (!result) {
return false;
} int tokenCount = 0;
for (int i = 0; i < needTokenNum; i++) {
Byte poll = tokenQueue.poll();
if (poll != null) {
tokenCount++;
}
} return tokenCount == needTokenNum;
} finally {
lock.unlock();
}
} @Override
public void start() { // 初始化桶队列大小
if (maxFlowRate != 0) {
tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
} // 初始化令牌生产者
TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
isStart = true; } @Override
public void stop() {
isStart = false;
scheduledExecutorService.shutdown();
} @Override
public boolean isStarted() {
return isStart;
} class TokenProducer implements Runnable { private int avgFlowRate;
private TokenBucket tokenBucket; public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
this.avgFlowRate = avgFlowRate;
this.tokenBucket = tokenBucket;
} @Override
public void run() {
tokenBucket.addTokens(avgFlowRate);
}
} public static TokenBucket newBuilder() {
return new TokenBucket();
} public TokenBucket everyTokenSize(int everyTokenSize) {
this.everyTokenSize = everyTokenSize;
return this;
} public TokenBucket maxFlowRate(int maxFlowRate) {
this.maxFlowRate = maxFlowRate;
return this;
} public TokenBucket avgFlowRate(int avgFlowRate) {
this.avgFlowRate = avgFlowRate;
return this;
} private String stringCopy(String data, int copyNum) { StringBuilder sbuilder = new StringBuilder(data.length() * copyNum); for (int i = 0; i < copyNum; i++) {
sbuilder.append(data);
} return sbuilder.toString(); } public static void main(String[] args) throws IOException, InterruptedException { tokenTest();
} private static void arrayTest() {
ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
tokenQueue.offer(1);
tokenQueue.offer(1);
tokenQueue.offer(1);
System.out.println(tokenQueue.size());
System.out.println(tokenQueue.remainingCapacity());
} private static void tokenTest() throws InterruptedException, IOException {
TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build(); BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));
String data = "xxxx";// 四个字节
for (int i = 1; i <= 1000; i++) { Random random = new Random();
int i1 = random.nextInt(100);
boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
TimeUnit.MILLISECONDS.sleep(100);
if (tokens) {
bufferedWriter.write("token pass --- index:" + i1);
System.out.println("token pass --- index:" + i1);
} else {
bufferedWriter.write("token rejuect --- index" + i1);
System.out.println("token rejuect --- index" + i1);
} bufferedWriter.newLine();
bufferedWriter.flush();
} bufferedWriter.close();
} }

参考:

http://xiaobaoqiu.github.io/blog/2015/07/02/ratelimiter/

http://redisdoc.com/string/incr.html

http://www.cnblogs.com/zhengyun_ustc/archive/2012/11/17/topic1.html

由于本人经验有限,文章中难免会有错误,请浏览文章的您指正或有不同的观点共同探讨!

最新文章

  1. 加密算法中BASE64、MD5、SHA、HMAC等之间的区别
  2. like语句百分号前置会使用到索引吗?
  3. poj 3254 Corn Fields
  4. SQL优化 1
  5. 深入分析Java Web技术(2) IO
  6. node-sqlserver :微软发布的 SQL Server 的 Node.js 驱动
  7. statspack系列2
  8. AsyncTask来源分析(一)
  9. 获取上一行记录lag
  10. Clairewd’s message ekmp
  11. 刘志梅 201771010115 《面向对象程序设计(java)》 第七周学习总结
  12. C#实现office文档转换为PDF格式
  13. MUI DtPicker 显示自定义日期
  14. org.hibernate.HibernateException: connnection proxy not usable after transaction completion
  15. 完整OSW安装方法
  16. ASA 用TFTP 备份配置方法
  17. OAuth2.0网页授权 提示未关注该测试号
  18. Pow(x,n) leetcode java
  19. Redis实现分布式存储Session
  20. Computer2

热门文章

  1. java中对象的序列化和反序列化
  2. StyleCop学习笔记——默认的规则
  3. zip压缩
  4. 设置SVN hooks实现自动发布
  5. 九度oj 1530 最长不重复子串
  6. mysql查看日志
  7. AngularJs学习笔记-AngularJS权威教程学习笔记
  8. 深入浅出Spring(五) SpringMVC
  9. UIProgressView swift
  10. 使用itunes同步ios时丢失照片恢复