grpc-go设置keepalive

ivansli 2022/02/05 1200℃ 0

每个grpc请求都是 stream,Keepalive 能够让 grpc 的每个 stream 保持长连接状态,适合一些执行时间长的请求。Keepalive 支持在服务端和客户端配置,且只有服务端配置后,客户端的配置才会真正有效。

server 端实现

https://github.com/grpc/grpc-go/blob/master/examples/features/keepalive/server/main.go

keepalive配置参数是针对整个连接的
grpc-go基于http/2实现,可以多路复用。即:多个请求复用同一个连接,每个请求都是一个单独的stream

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "net"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/keepalive"

    pb "google.golang.org/grpc/examples/features/proto/echo"
)

var port = flag.Int("port", 50052, "port number")

var kaep = keepalive.EnforcementPolicy{
    MinTime:             5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
    PermitWithoutStream: true,            // Allow pings even when there are no active streams
}

var kasp = keepalive.ServerParameters{
    MaxConnectionIdle:     15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
    MaxConnectionAge:      30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
    MaxConnectionAgeGrace: 5 * time.Second,  // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
    Time:                  5 * time.Second,  // Ping the client if it is idle for 5 seconds to ensure the connection is still active
    Timeout:               1 * time.Second,  // Wait 1 second for the ping ack before assuming the connection is dead
}

// server implements EchoServer.
type server struct {
    pb.UnimplementedEchoServer
}

func (s *server) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
    return &pb.EchoResponse{Message: req.Message}, nil
}

func main() {
    flag.Parse()

    address := fmt.Sprintf(":%v", *port)
    lis, err := net.Listen("tcp", address)
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
    pb.RegisterEchoServer(s, &server{})

    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

执行

go run server/main.go

server keepalive 的实现核心在于 keepalive.ServerParameterskeepalive.EnforcementPolicy

首先是 keepalive.ServerParameters,包含以下几个属性:

  • MaxConnectionIdle:最大空闲连接时间,默认为无限制。超出这段时间后,serve 发送 GoWay,强制 client stream 断开
  • MaxConnectionAge:最大连接时间,默认为无限制。stream 连接超出这个值是发送一个 GoWay
  • MaxConnectionAgeGrace:超出MaxConnectionAge之后的宽限时长,默认无限制 (最小为 1s)
  • Time:如果一段时间客户端存活但没有 pings 请求,服务端发送一次 ping 请求,默认是 2hour
  • Timeout:服务端发送 ping 请求超时的时间,默认20s

    即:在发送ping包之后,Timeout 时间内没有收到 ack 则视为超时

keepalive.EnforcementPolicy 为服务端强制执行策略,如果客户端违反则断开连接。它有两个属性:

  • MinTime : 如果在指定时间内收到 pings 次数大于一次,强制断开连接,默认 5min

    防止客户端在一段时间间隔内发送太频繁 ping 操作

  • PermitWithoutStream:没有活动的 stream 也允许pings。默认关闭

    是否允许没有流时发送 ping

注意:
若设置 MaxConnectionAge,而没设置 MaxConnectionAgeGrace,在达到 MaxConnectionAge 后看不到效果。
原因见下面代码:

// grpc-go源码文件&位置
// internal/transport/defaults.go

const (
    // The default value of flow control window size in HTTP2 spec.
    defaultWindowSize = 65535
    // The initial window size for flow control.
    initialWindowSize             = defaultWindowSize // for an RPC
    infinity                      = time.Duration(math.MaxInt64)
    defaultClientKeepaliveTime    = infinity
    defaultClientKeepaliveTimeout = 20 * time.Second
    defaultMaxStreamsClient       = 100
    defaultMaxConnectionIdle      = infinity
    defaultMaxConnectionAge       = infinity
    defaultMaxConnectionAgeGrace  = infinity
    defaultServerKeepaliveTime    = 2 * time.Hour
    defaultServerKeepaliveTimeout = 20 * time.Second
    defaultKeepalivePolicyMinTime = 5 * time.Minute
    // max window limit set by HTTP2 Specs.
    maxWindowSize = math.MaxInt32
    // defaultWriteQuota is the default value for number of data
    // bytes that each stream can schedule before some of it being
    // flushed out.
    defaultWriteQuota              = 64 * 1024
    defaultClientMaxHeaderListSize = uint32(16 << 20)
    defaultServerMaxHeaderListSize = uint32(16 << 20)
)


// grpc-go源码文件&位置
// internal/transport/http2_server.go

func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
    ......

    kp := config.KeepaliveParams

    // 最大空闲时间
    if kp.MaxConnectionIdle == 0 {
      kp.MaxConnectionIdle = defaultMaxConnectionIdle
    }
    // 最大存活时间
    if kp.MaxConnectionAge == 0 {
      kp.MaxConnectionAge = defaultMaxConnectionAge
    }
    // Add a jitter to MaxConnectionAge.
    // 添加一个随机值
    kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)

    // 超出MaxConnectionAge之后的宽限时长,默认无限制,最小为 1s
    if kp.MaxConnectionAgeGrace == 0 {
      kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
    }

    ......

    go t.keepalive()

    return t, nil
}


// keepalive running in a separate goroutine does the following:
// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection after an additional duration of keepalive.Timeout.
func (t *http2Server) keepalive() {
    p := &ping{}

    // True iff a ping has been sent, and no data has been received since then.
    outstandingPing := false

    // Amount of time remaining before which we should receive an ACK for the
    // last sent ping.
    kpTimeoutLeft := time.Duration(0)

    // Records the last value of t.lastRead before we go block on the timer.
    // This is required to check for read activity since then.
    prevNano := time.Now().UnixNano()

    // Initialize the different timers to their default values.
    // 初始化不同的定时器
    idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
    ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
    kpTimer := time.NewTimer(t.kp.Time)

  // 在 defer 中关闭定时器
    defer func() {
        // We need to drain the underlying channel in these timers after a call
        // to Stop(), only if we are interested in resetting them. Clearly we
        // are not interested in resetting them here.
        idleTimer.Stop()
        ageTimer.Stop()
        kpTimer.Stop()
    }()

    for {
        select {
        case <-idleTimer.C: // 空闲定时器
            t.mu.Lock()
            idle := t.idle
            if idle.IsZero() { // The connection is non-idle.
                t.mu.Unlock()
                idleTimer.Reset(t.kp.MaxConnectionIdle)
                continue
            }

            val := t.kp.MaxConnectionIdle - time.Since(idle)
            t.mu.Unlock()
            if val <= 0 {
                // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
                // Gracefully close the connection.
                t.Drain()
                return
            }

            idleTimer.Reset(val)

        case <-ageTimer.C: // 最大连接时间定时器
            t.Drain()

            // 把定时器重置为 MaxConnectionAgeGrace
            // 假设 设置了 MaxConnectionAge 而没设置 MaxConnectionAgeGrace
            // 则 MaxConnectionAgeGrace 是 defaultMaxConnectionAgeGrace  = infinity
            // 由于 infinity 是一个非常大的数值,所以在达到 MaxConnectionAge 后看不到效果
            ageTimer.Reset(t.kp.MaxConnectionAgeGrace)

            select {
            case <-ageTimer.C:
                // Close the connection after grace period.
                if logger.V(logLevel) {
                    logger.Infof("transport: closing server transport due to maximum connection age.")
                }
                t.Close()
            case <-t.done:
            }
            return

        case <-kpTimer.C: // 在指定时间没看到客户端活跃,则发送ping数据帧
            lastRead := atomic.LoadInt64(&t.lastRead)
            if lastRead > prevNano {
                // There has been read activity since the last time we were
                // here. Setup the timer to fire at kp.Time seconds from
                // lastRead time and continue.
                outstandingPing = false
                kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
                prevNano = lastRead
                continue
            }

            if outstandingPing && kpTimeoutLeft <= 0 {
                if logger.V(logLevel) {
                    logger.Infof("transport: closing server transport due to idleness.")
                }
                t.Close()
                return
            }

            if !outstandingPing {
                if channelz.IsOn() {
                    atomic.AddInt64(&t.czData.kpCount, 1)
                }
                t.controlBuf.put(p)
                kpTimeoutLeft = t.kp.Timeout
                outstandingPing = true
            }
            // The amount of time to sleep here is the minimum of kp.Time and
            // timeoutLeft. This will ensure that we wait only for kp.Time
            // before sending out the next ping (for cases where the ping is
            // acked).
            sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
            kpTimeoutLeft -= sleepDuration
            kpTimer.Reset(sleepDuration)

        case <-t.done: // 已经关闭,退出
            return
        }
    }
}

client 端实现

https://github.com/grpc/grpc-go/blob/master/examples/features/keepalive/client/main.go

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "time"

    "google.golang.org/grpc"
    pb "google.golang.org/grpc/examples/features/proto/echo"
    "google.golang.org/grpc/keepalive"
)

var addr = flag.String("addr", "localhost:50052", "the address to connect to")

var kacp = keepalive.ClientParameters{
    Time:                10 * time.Second, // send pings every 10 seconds if there is no activity
    Timeout:             time.Second,      // wait 1 second for ping ack before considering the connection dead
    PermitWithoutStream: true,             // send pings even without active streams
}

func main() {
    flag.Parse()

    conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp))
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()

    c := pb.NewEchoClient(conn)

    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
    defer cancel()
    fmt.Println("Performing unary request")
    res, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "keepalive demo"})
    if err != nil {
        log.Fatalf("unexpected error from UnaryEcho: %v", err)
    }
    fmt.Println("RPC response:", res)
    select {} // Block forever; run with GODEBUG=http2debug=2 to observe ping frames and GOAWAYs due to idleness.
}

执行

GODEBUG=http2debug=2 go run client/main.go

keepalive.ClientParameters是在客户端使用的 keepalive 配置:

  • Time :ping 请求间隔时间,默认无限制,最小为 10s
  • Timeout :ping 超时时间,默认是 20s

    即:在发送ping包之后,Timeout 时间内没有收到 ack 则视为超时

  • PermitWithoutStream:没有活动的 stream 也允许pings。默认关闭

更多细节&设置,详见 https://github.com/grpc/grpc-go/tree/master/Documentation/keepalive.md

其他参考

Golang

评论啦~