基于java gRPC 1.24.2 分析

结论

  1. gRPC keepAlive是grpc框架在应用层面连接保活的一种措施。即当grpc连接上没有业务数据时,是否发送pingpong,以保持连接活跃性,不因长时间空闲而被Server或操作系统关闭
  2. gRPC keepAlive在client与server都有,client端默认关闭(keepAliveTime为Long.MAX_VALUE), server端默认打开,keepAliveTime为2小时,即每2小时向client发送一次ping
// io.grpc.internal.GrpcUtil
public static final long DEFAULT_SERVER_KEEPALIVE_TIME_NANOS = TimeUnit.HOURS.toNanos(2L);
  1. KeepAlive的管理使用类io.grpc.internal.KeepAliveManager, 用于管理KeepAlive状态,ping任务调度与执行.

Client端KeepAlive

使用入口

  1. 我们在使用io.grpc框架创建grpc连接的时候,可以设置keeplive, 例如下面:
NettyChannelBuilder builder = NettyChannelBuilder.forTarget(String.format("grpc://%s", provider)) //
.usePlaintext() //
.defaultLoadBalancingPolicy(props.getBalancePolicy()) //
.maxInboundMessageSize(props.getMaxInboundMessageSize()) //
.keepAliveTime(1,TimeUnit.MINUTES)
.keepAliveWithoutCalls(true)
.keepAliveTimeout(10,TimeUnit.SECONDS)
.intercept(channelManager.getInterceptors()); //
  1. 其中与keepAlive相关的参数有三个,keepAliveTime,keepAliveTimeout,keepAliveWithoutCalls。这三个变量有什么作用呢?
  • keepAliveTime: 表示当grpc连接没有数据传递时,多久之后开始向server发送ping packet
  • keepAliveTimeout: 表示当发送完ping packet后多久没收到server回应算超时
  • keepAliveTimeoutCalls: 表示如果grpc连接没有数据传递时,是否keepAlive,默认为false

简要时序列表

Create & Start

NettyChannelBuilder
-----> NettyTransportFactory
---------> NettyClientTransport
-------------> KeepAliveManager & NettyClientHandler

响应各种事件

当Active、Idle、DataReceived、Started、Termination事件发生时,更改KeepAlive状态,调度发送ping任务。

Server端KeepAlive

使用入口

// 只截取关键代码,详细代码请看`NettyServerBuilder`
ServerImpl server = new ServerImpl(
this,
buildTransportServers(getTracerFactories()),
Context.ROOT);
for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {
notifyTarget.notifyOnBuild(server);
}
return server; // 在buildTransportServers方法中创建NettyServer
List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
for (SocketAddress listenAddress : listenAddresses) {
NettyServer transportServer = new NettyServer(
listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool,
workerEventLoopGroupPool, negotiator, streamTracerFactories,
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz());
transportServers.add(transportServer);
}

简要时序列表

Create & Start

NettyServerBuilder
---> NettyServer
---------> NettyServerTransport
-------------> NettyServerHandler
-----------------> KeepAliveEnforcer

连接准备就绪

调用 io.netty.channel.ChannelHandler的handlerAdded方法,关于此方法的描述:

Gets called after the ChannelHandler was added to the actual context and it's ready to handle events.
NettyServerHandler(handlerAdded)
---> 创建KeepAliveManager对象

响应各种事件

同Client

KeepAliveEnforcer

在上面Server端的简要时序图中,可以看见,server端有一个特有的io.grpc.netty.KeepAliveEnforcer

此类的作用是监控clinet ping的频率,以确保其在一个合理范围内。

package io.grpc.netty;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckReturnValue; /** Monitors the client's PING usage to make sure the rate is permitted. */
class KeepAliveEnforcer {
@VisibleForTesting
static final int MAX_PING_STRIKES = 2;
@VisibleForTesting
static final long IMPLICIT_PERMIT_TIME_NANOS = TimeUnit.HOURS.toNanos(2); private final boolean permitWithoutCalls;
private final long minTimeNanos;
private final Ticker ticker;
private final long epoch; private long lastValidPingTime;
private boolean hasOutstandingCalls;
private int pingStrikes; public KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit) {
this(permitWithoutCalls, minTime, unit, SystemTicker.INSTANCE);
} @VisibleForTesting
KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit, Ticker ticker) {
Preconditions.checkArgument(minTime >= 0, "minTime must be non-negative"); this.permitWithoutCalls = permitWithoutCalls;
this.minTimeNanos = Math.min(unit.toNanos(minTime), IMPLICIT_PERMIT_TIME_NANOS);
this.ticker = ticker;
this.epoch = ticker.nanoTime();
lastValidPingTime = epoch;
} /** Returns {@code false} when client is misbehaving and should be disconnected. */
@CheckReturnValue
public boolean pingAcceptable() {
long now = ticker.nanoTime();
boolean valid;
if (!hasOutstandingCalls && !permitWithoutCalls) {
valid = compareNanos(lastValidPingTime + IMPLICIT_PERMIT_TIME_NANOS, now) <= 0;
} else {
valid = compareNanos(lastValidPingTime + minTimeNanos, now) <= 0;
}
if (!valid) {
pingStrikes++;
return !(pingStrikes > MAX_PING_STRIKES);
} else {
lastValidPingTime = now;
return true;
}
} /**
* Reset any counters because PINGs are allowed in response to something sent. Typically called
* when sending HEADERS and DATA frames.
*/
public void resetCounters() {
lastValidPingTime = epoch;
pingStrikes = 0;
} /** There are outstanding RPCs on the transport. */
public void onTransportActive() {
hasOutstandingCalls = true;
} /** There are no outstanding RPCs on the transport. */
public void onTransportIdle() {
hasOutstandingCalls = false;
} /**
* Positive when time1 is greater; negative when time2 is greater; 0 when equal. It is important
* to use something like this instead of directly comparing nano times. See {@link
* System#nanoTime}.
*/
private static long compareNanos(long time1, long time2) {
// Possibility of overflow/underflow is on purpose and necessary for correctness
return time1 - time2;
} @VisibleForTesting
interface Ticker {
long nanoTime();
} @VisibleForTesting
static class SystemTicker implements Ticker {
public static final SystemTicker INSTANCE = new SystemTicker(); @Override
public long nanoTime() {
return System.nanoTime();
}
}
}
  1. 先来看pingAcceptable方法,此方法是判断是否接受client ping。
  • lastValidPingTime是上次client valid ping的时间, 连接建立时此时间等于KeepAliveEnforcer对象创建的时间。当client ping有效时,其等于当时ping的时间
  • hasOutstandingCalls其初始值为false,当连接activie时,其值为true,当连接idle时,其值为false。如果grpc调用为阻塞时调用,则调用时连接变为active,调用完成,连接变为idle.
  • permitWithoutCalls其值是创建NettyServer时传入,默认为false.
  • IMPLICIT_PERMIT_TIME_NANOS其值为常量,2h
  • minTimeNanos其值是创建NettyServer时传入,默认为5min.
  • MAX_PING_STRIKES其值为常量2
  1. resetCounters方法是当grpc当中有数据时会被调用,即有grpc调用时lastValidPingTime和pingStrikes会被重置。
  2. 如果client要想使用keepAlive,permitWithoutCalls值需要设置为true,而且cient keepAliveTime需要>=minTimeNanos

最新文章

  1. C++ std::multimap
  2. SQL Server 查询表备注信息的语句
  3. Objective-C学习笔记-第二天(1)
  4. 导出excel和PDF小结 vba
  5. android studio 出错
  6. 测试题1 IOS面试基础题
  7. IOS学习之IOS沙盒(sandbox)机制和文件操作
  8. 黑马程序员-IO(二)
  9. Jenkins 三: Jenkins CLI
  10. OpenGL —— 基础笔记
  11. (转)关闭iptables和SELinux
  12. 高橋君とカード / Tak and Cards AtCoder - 2037 (DP)
  13. Python之print(args)与sys.stdout.write(string)使用总结
  14. html标签二
  15. Java 读取 .properties 配置文件
  16. Ubuntu16.04(Linux)安装JDK
  17. Jmeter(三)断言和关联
  18. 【CSS】CSS Sprites (CSS 精灵) 技术
  19. LintCode-105.复制带随机指针的链表
  20. 寻找与网页内容相关的图片(三)网易新闻与qq空间的做法

热门文章

  1. EPIC限免提示
  2. SDIO移植
  3. KingbaseES 并行查询
  4. 对表白墙wxml文件解释
  5. 路径参数和数值校验: Path_Parameters_and_Numeric_Validations
  6. fastdfs-zyc管理FastDFS的web界面
  7. Makfile总结
  8. 小程序uni-app发起网络异步请求
  9. Gson的使用与理解
  10. Java注解(1):码农的小秘