前言

RPC 服务中,每个服务的容量都是有限的,即资源有限,只能承受住给定的网络请求,所以,在设计 RPC 框架的时候,一定要考虑流量控制这个问题。而 Java 中,实现流量控制有很多中方式,今天说 2 种。

Semaphore 实现流控

代码:

  static Semaphore semaphore = new Semaphore(100);

  public static void main(String[] args) {

    Executor timeTask = Executors.newScheduledThreadPool(1);
((ScheduledExecutorService) timeTask).scheduleAtFixedRate(
() -> semaphore.release(100 - semaphore.availablePermits()), 1000, 1000,
TimeUnit.MILLISECONDS); Executor pool = Executors.newFixedThreadPool(100); for (int i = 0; i < 100; i++) {
final int num = i;
pool.execute(() -> {
for (; ; ) {
for (int j = 0; j < 200; j++) {
if (semaphore.tryAcquire()) {
callRpc(num, j);
} else {
System.err.println("call fail");
}
}
}
});
}
} private static void callRpc(int num, int j) {
System.out.println(String.format("%s - %s: %d %d", new Date(), Thread.currentThread(), num, j));
}

代码中,我们模拟了 100 个线程,每个线程无限调用 RPC。

同时使用另一个定时任务,定时更新 Semaphore 可用许可为 100。

客户端线程调用时,会尝试获取信号量,当获取成功时,才会调用调用 RPC,反之,打印失败。

这个小程序实现了每秒钟限制 100 个请求的 RPC 的流量控制。

AtomicInteger 实现流控

代码:

  static AtomicInteger count = new AtomicInteger();

  public static void main(String[] args) {
Executor timeTask = Executors.newScheduledThreadPool(1);
((ScheduledExecutorService) timeTask).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
count.getAndSet(100);
}
}, 1000, 1000, TimeUnit.MILLISECONDS); Executor pool = Executors.newFixedThreadPool(100); for (int i = 0; i < 100; i++) {
final int num = i;
pool.execute(() -> {
for (; ; ) {
for (int j = 0; j < 200; j++) {
if (count.get() >= 0) {// 快速判断,否则大量的 CAS 操作将会定时任务更新计数器 count
if (count.decrementAndGet() >= 0) {
callRpc(num, j);
}
}
}
}
});
}
} private static void callRpc(int num, int j) {
System.out.println(String.format("%s - %s: %d %d", new Date(), Thread.currentThread(), num, j));
}

这段代码和上面的类似,只是使用的 API 不同,这里使用的是 CAS。通过对 CAS 递减,达到流控的目的。

注意,这里有一个双重判断,先判断 count.get() >= 0,为什么呢?

如果直接使用 decrementAndGet 方法,则会使用 CAS,100 个线程并发使用 CAS ,将会导致定时任务的 CAS 操作不够及时。

所以,先判断,是否小于0 ,如果小于0了,就不必尝试 CAS,避免影响定时任务。

最新文章

  1. C#基础-压缩文件及故障排除
  2. Codeforces Round #381 (Div. 2)C. Alyona and mex(思维)
  3. WebServices(转)
  4. sql优化建议
  5. Docker学习资料
  6. hibernate 知识梳理
  7. poj3480--John
  8. HibernateTemplate类的方法flush()
  9. 百度apistore第三方登陆使用说明
  10. session对象和applicatione对象
  11. eval、json.parse()的介绍和使用注意点
  12. 自动生成Makefile文件
  13. 4.28Linux(6)
  14. python 多线程threading的学习一
  15. 学习使用scrapy itemspipeline过程
  16. scrapy使用MongoDB简单示例
  17. ones测试用例管理平台
  18. Python的第二次作业
  19. PHP mysqli_free_result()与mysqli_fetch_array()函数
  20. SQLServer 学习相关资料整理【转】

热门文章

  1. Delphi中Unicode转中文
  2. ELK冷热数据分离
  3. Canvas教程
  4. 三、winForm-DataGridView操作——DataGridView 操作复选框checkbox
  5. 定时任务 Wpf.Quartz.Demo.3
  6. MVC所有的ActionResult
  7. this与$(this)对象
  8. 【BZOJ3160】 万径人踪灭(FFT,manacher)
  9. 实现域名访问网站—nginx反向代理
  10. cad 关键字被保留了?选择集关键字保留了? N S W E关键字无法用?