gRPC(Java) keepAlive機制研究
基於java gRPC 1.24.2 分析
結論
- gRPC keepAlive是grpc框架在應用層面連接保活的一種措施。即當grpc連接上沒有業務數據時,是否發送pingpong,以保持連接活躍性,不因長時間空閑而被Server或作業系統關閉
- gRPC keepAlive在client與server都有,client端默認關閉(keepAliveTime為Long.MAX_VALUE), server端默認打開,keepAliveTime為2小時,即每2小時向client發送一次ping
// io.grpc.internal.GrpcUtil
public static final long DEFAULT_SERVER_KEEPALIVE_TIME_NANOS = TimeUnit.HOURS.toNanos(2L);
- KeepAlive的管理使用類
io.grpc.internal.KeepAliveManager
, 用於管理KeepAlive狀態,ping任務調度與執行.
Client端KeepAlive
使用入口
- 我們在使用io.grpc框架創建grpc連接的時候,可以設置keeplive, 例如下面:
NettyChannelBuilder builder = NettyChannelBuilder.forTarget(String.format("grpc://%s", provider)) //
.usePlaintext() //
.defaultLoadBalancingPolicy(props.getBalancePolicy()) //
.maxInboundMessageSize(props.getMaxInboundMessageSize()) //
.keepAliveTime(1,TimeUnit.MINUTES)
.keepAliveWithoutCalls(true)
.keepAliveTimeout(10,TimeUnit.SECONDS)
.intercept(channelManager.getInterceptors()); //
- 其中與keepAlive相關的參數有三個,
keepAliveTime
,keepAliveTimeout
,keepAliveWithoutCalls
。這三個變數有什麼作用呢?
- keepAliveTime: 表示當grpc連接沒有數據傳遞時,多久之後開始向server發送ping packet
- keepAliveTimeout: 表示當發送完ping packet後多久沒收到server回應算超時
- keepAliveTimeoutCalls: 表示如果grpc連接沒有數據傳遞時,是否keepAlive,默認為false
簡要時序列表
Create & Start
NettyChannelBuilder
-----> NettyTransportFactory
---------> NettyClientTransport
-------------> KeepAliveManager & NettyClientHandler
響應各種事件
當Active、Idle、DataReceived、Started、Termination事件發生時,更改KeepAlive狀態,調度發送ping任務。
Server端KeepAlive
使用入口
// 只截取關鍵程式碼,詳細程式碼請看`NettyServerBuilder`
ServerImpl server = new ServerImpl(
this,
buildTransportServers(getTracerFactories()),
Context.ROOT);
for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {
notifyTarget.notifyOnBuild(server);
}
return server;
// 在buildTransportServers方法中創建NettyServer
List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
for (SocketAddress listenAddress : listenAddresses) {
NettyServer transportServer = new NettyServer(
listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool,
workerEventLoopGroupPool, negotiator, streamTracerFactories,
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz());
transportServers.add(transportServer);
}
簡要時序列表
Create & Start
NettyServerBuilder
---> NettyServer
---------> NettyServerTransport
-------------> NettyServerHandler
-----------------> KeepAliveEnforcer
連接準備就緒
調用 io.netty.channel.ChannelHandler的handlerAdded
方法,關於此方法的描述:
Gets called after the ChannelHandler was added to the actual context and it's ready to handle events.
NettyServerHandler(handlerAdded)
---> 創建KeepAliveManager對象
響應各種事件
同Client
KeepAliveEnforcer
在上面Server端的簡要時序圖中,可以看見,server端有一個特有的io.grpc.netty.KeepAliveEnforcer
類
此類的作用是監控clinet ping的頻率,以確保其在一個合理範圍內。
package io.grpc.netty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckReturnValue;
/** Monitors the client's PING usage to make sure the rate is permitted. */
class KeepAliveEnforcer {
@VisibleForTesting
static final int MAX_PING_STRIKES = 2;
@VisibleForTesting
static final long IMPLICIT_PERMIT_TIME_NANOS = TimeUnit.HOURS.toNanos(2);
private final boolean permitWithoutCalls;
private final long minTimeNanos;
private final Ticker ticker;
private final long epoch;
private long lastValidPingTime;
private boolean hasOutstandingCalls;
private int pingStrikes;
public KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit) {
this(permitWithoutCalls, minTime, unit, SystemTicker.INSTANCE);
}
@VisibleForTesting
KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit, Ticker ticker) {
Preconditions.checkArgument(minTime >= 0, "minTime must be non-negative");
this.permitWithoutCalls = permitWithoutCalls;
this.minTimeNanos = Math.min(unit.toNanos(minTime), IMPLICIT_PERMIT_TIME_NANOS);
this.ticker = ticker;
this.epoch = ticker.nanoTime();
lastValidPingTime = epoch;
}
/** Returns {@code false} when client is misbehaving and should be disconnected. */
@CheckReturnValue
public boolean pingAcceptable() {
long now = ticker.nanoTime();
boolean valid;
if (!hasOutstandingCalls && !permitWithoutCalls) {
valid = compareNanos(lastValidPingTime + IMPLICIT_PERMIT_TIME_NANOS, now) <= 0;
} else {
valid = compareNanos(lastValidPingTime + minTimeNanos, now) <= 0;
}
if (!valid) {
pingStrikes++;
return !(pingStrikes > MAX_PING_STRIKES);
} else {
lastValidPingTime = now;
return true;
}
}
/**
* Reset any counters because PINGs are allowed in response to something sent. Typically called
* when sending HEADERS and DATA frames.
*/
public void resetCounters() {
lastValidPingTime = epoch;
pingStrikes = 0;
}
/** There are outstanding RPCs on the transport. */
public void onTransportActive() {
hasOutstandingCalls = true;
}
/** There are no outstanding RPCs on the transport. */
public void onTransportIdle() {
hasOutstandingCalls = false;
}
/**
* Positive when time1 is greater; negative when time2 is greater; 0 when equal. It is important
* to use something like this instead of directly comparing nano times. See {@link
* System#nanoTime}.
*/
private static long compareNanos(long time1, long time2) {
// Possibility of overflow/underflow is on purpose and necessary for correctness
return time1 - time2;
}
@VisibleForTesting
interface Ticker {
long nanoTime();
}
@VisibleForTesting
static class SystemTicker implements Ticker {
public static final SystemTicker INSTANCE = new SystemTicker();
@Override
public long nanoTime() {
return System.nanoTime();
}
}
}
- 先來看
pingAcceptable
方法,此方法是判斷是否接受client ping。
lastValidPingTime
是上次client valid ping的時間, 連接建立時此時間等於KeepAliveEnforcer對象創建的時間。當client ping有效時,其等於當時ping的時間hasOutstandingCalls
其初始值為false,當連接activie時,其值為true,當連接idle時,其值為false。如果grpc調用為阻塞時調用,則調用時連接變為active,調用完成,連接變為idle.permitWithoutCalls
其值是創建NettyServer時傳入,默認為false.IMPLICIT_PERMIT_TIME_NANOS
其值為常量,2hminTimeNanos
其值是創建NettyServer時傳入,默認為5min.MAX_PING_STRIKES
其值為常量2
resetCounters
方法是當grpc當中有數據時會被調用,即有grpc調用時lastValidPingTime和pingStrikes會被重置。- 如果client要想使用keepAlive,
permitWithoutCalls
值需要設置為true,而且cient keepAliveTime需要>=minTimeNanos