一次 goroutine泄露 的排查

ivansli 2022/01/12 1006℃ 0

做过程序开发的人在提起泄露这个词时,会下意识的想到: 内存 -> 内存没被释放 -> 堆内存没有被释放
做过Golang开发的人在提起泄露时,会下意识的想到: 堆/goroutine -> 堆/goroutine没被释放

背景

近段时间,在维护公司一个关于微软ASR音频转文本的服务时,发现内存会随着时间一直增长,如下图:

图中可以看到有3个爬坡的过程,是因为我们服务重启了3次

脑海中冒出的第一个念头就是:内存泄露了。
于是展开了一系列的排查分析,最终定位到具体的代码,特此记录下来分享给需要的人。

一些知识点

① 内存泄露发生在堆上

进程启动之后内存布局一般分为:代码区、数据区、堆、栈。
代码区、数据区一般是固定的。
堆、栈随着时间会动态的发生变化。
栈会自动的增长、收缩,与方法的调用有关。方法调用完毕之后,栈收缩,使用的内存会被自动释放。
堆一般需要开发显式的申请、释放(C/C++等没有垃圾回收的语言),或者由GC自动回收(Java、Go等有垃圾回收的语言)。
有时候,就算具有GC也可能会导致泄露。例如:申请的内存一直被引用,使用完毕之后,该引用仍旧存在,就会导致对应的内存迟迟得不到释放。

② 垃圾回收的策略

对于带有垃圾回收的语言,其实现方式也大同小异,一般有以下几中方式:
I. 引用计数
II. 标记、清除
III. 其他
一句话总结就是:需要一种策略区分出内存的使用情况,然后释放掉不再使用的内存。

③ 造成Go内存泄露的常见情况

I. 锁未释放
II. chan阻塞
III. 定时器使用不当(可能会造成间歇性的内存泄露)
IV. 其他

泄露排查过程

对于遇到的这个泄露问题,首先想到的是 pprof 的使用。刚好项目中引入的有对应 pprof 包:

import (
  _ "net/http/pprof"
)

func init() {
  go func(){
    http.ListenAndServe("0.0.0.0:6060", nil)
  }
}

由于生产环境使用的ip是内网ip,无法使用浏览器直接请求对应接口通过图形查看,所以只能使用命令行来验证。
第一感觉是 goroutine 发生了泄露,于是就先使用 http://localhost:6060/debug/pprof/goroutine 来查看协程的情况。

> go tool pprof http://localhost:6060/debug/pprof/goroutine

Fetching profile over HTTP from http://localhost:6060/debug/pprof/goroutine
Saved profile in /root/pprof/pprof.xxxxx-grpc.goroutine.016.pb.gz
File: xxxxxx
Build ID: 0de7821f1145eb036faf3cc5735da0e65bb76cxxx
Type: goroutine
Time: Jan 12, 2022 at 9:07am (UTC)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) 
(pprof) top
Showing nodes accounting for 518437, 100% of 518439 total
Dropped 124 nodes (cum <= 2592)
      flat  flat%   sum%        cum   cum%
    518437   100%   100%     518437   100%  runtime.gopark
         0     0%   100%     258979 49.95%  speech.SpeechRecognizer.StartContinuousRecognitionAsync.func1
         0     0%   100%     258978 49.95%  speech.SpeechRecognizer.StopContinuousRecognitionAsync.func1
         0     0%   100%     517957 99.91%  runtime.chansend
         0     0%   100%     517957 99.91%  runtime.chansend1
(pprof) 
(pprof) 
(pprof) traces runtime.gopark
File: xxxxxx
Build ID: 0de7821f1145eb036faf3cc5735da0e65bb76cce
Type: goroutine
Time: Jan 12, 2022 at 9:07am (UTC)
-----------+-------------------------------------------------------
    258979   runtime.gopark
             runtime.chansend
             runtime.chansend1
             speech.SpeechRecognizer.StartContinuousRecognitionAsync.func1
-----------+-------------------------------------------------------
    258978   runtime.gopark
             runtime.chansend
             runtime.chansend1
             speech.SpeechRecognizer.StopContinuousRecognitionAsync.func1

通过查看 goroutine 的情况,发现有大量协程被 runtime.gopark,然后通过 traces 查看 runtime.gopark 的函数栈调用情况。发现了两个罪魁祸首:

-----------+-------------------------------------------------------
    258979   runtime.gopark
             runtime.chansend
             runtime.chansend1
             speech.SpeechRecognizer.StartContinuousRecognitionAsync.func1
-----------+-------------------------------------------------------
    258978   runtime.gopark
             runtime.chansend
             runtime.chansend1
             speech.SpeechRecognizer.StopContinuousRecognitionAsync.func1

吃鲸,已经泄露了几十万个 goroutine。
通过调用栈可以找到发生泄露的地方,大致调用路径是:

speech.SpeechRecognizer.StartContinuousRecognitionAsync.func1 -> runtime.chansend1 -> runtime.chansend -> runtime.gopark
speech.SpeechRecognizer.StopContinuousRecognitionAsync.func1 -> runtime.chansend1 -> runtime.chansend -> runtime.gopark

runtime.chansend1 是阻塞的调用,协程最终被 runtime.gopark 挂起,从而导致泄露。

// go源码
//
// entry point for c <- x from compiled code
// 
// func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
// 第三个参数 block 为 true,也就是说chan没有接收方或者缓冲区不足一直在阻塞
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

speech.SpeechRecognizer.StartContinuousRecognitionAsync.func1
speech.SpeechRecognizer.StopContinuousRecognitionAsync.func1

go中使用 go func(){}启动的协程没有对应的函数名称,编译器会使用func1之类的名称代替该协程对应的方法。

speech.SpeechRecognizer.StartContinuousRecognitionAsync.func1
speech.SpeechRecognizer.StopContinuousRecognitionAsync.func1

也就是说: 包 speech 中结构体 SpeechRecognizer 的实例对象,调用其方法 StartContinuousRecognitionAsync、StopContinuousRecognitionAsync。
在方法中分别启动了新的goroutine,并在新的goroutine中使用阻塞的方式向某个chan中发送数据导致了阻塞,从而产生了泄露。

追溯对应的两个方法:

// StartContinuousRecognitionAsync asynchronously initiates continuous speech recognition operation.
func (recognizer SpeechRecognizer) StartContinuousRecognitionAsync() chan error {
    outcome := make(chan error)
    go func() {
        // Close any unfinished previous attempt
        ret := releaseAsyncHandleIfValid(&recognizer.handleAsyncStartContinuous)
        if ret == C.SPX_NOERROR {
            ret = uintptr(C.recognizer_start_continuous_recognition_async(recognizer.handle, &recognizer.handleAsyncStartContinuous))
        }
        if ret == C.SPX_NOERROR {
            ret = uintptr(C.recognizer_start_continuous_recognition_async_wait_for(recognizer.handleAsyncStartContinuous, math.MaxUint32))
        }
        releaseAsyncHandleIfValid(&recognizer.handleAsyncStartContinuous)
        if ret != C.SPX_NOERROR {
            outcome <- common.NewCarbonError(ret)
            return
        }
        outcome <- nil
    }()
    return outcome
}

// StopContinuousRecognitionAsync asynchronously terminates ongoing continuous speech recognition operation.
func (recognizer SpeechRecognizer) StopContinuousRecognitionAsync() chan error {
    outcome := make(chan error)
    go func() {
        ret := releaseAsyncHandleIfValid(&recognizer.handleAsyncStopContinuous)
        if ret == C.SPX_NOERROR {
            ret = uintptr(C.recognizer_stop_continuous_recognition_async(recognizer.handle, &recognizer.handleAsyncStopContinuous))
        }
        if ret == C.SPX_NOERROR {
            ret = uintptr(C.recognizer_stop_continuous_recognition_async_wait_for(recognizer.handleAsyncStopContinuous, math.MaxUint32))
        }
        releaseAsyncHandleIfValid(&recognizer.handleAsyncStopContinuous)
        if ret != C.SPX_NOERROR {
            outcome <- common.NewCarbonError(ret)
            return
        }
        outcome <- nil
    }()
    return outcome
}

明显可以看到,上面2个方法返回了一个非缓冲的 chan。并在新启动的 goroutine 中,return 之前试图向 chan 中写入数据。

然后,我们一起看看微软官方提供的demo代码示例

https://docs.microsoft.com/en-us/azure/cognitive-services/speech-service/get-started-speech-to-text?tabs=windowsinstall&pivots=programming-language-go

(截止到 2022-01-14) 微软demo如下:

package main

import (
    "bufio"
    "fmt"
    "os"

    "github.com/Microsoft/cognitive-services-speech-sdk-go/audio"
    "github.com/Microsoft/cognitive-services-speech-sdk-go/speech"
)

func sessionStartedHandler(event speech.SessionEventArgs) {
    defer event.Close()
    fmt.Println("Session Started (ID=", event.SessionID, ")")
}

func sessionStoppedHandler(event speech.SessionEventArgs) {
    defer event.Close()
    fmt.Println("Session Stopped (ID=", event.SessionID, ")")
}

func recognizingHandler(event speech.SpeechRecognitionEventArgs) {
    defer event.Close()
    fmt.Println("Recognizing:", event.Result.Text)
}

func recognizedHandler(event speech.SpeechRecognitionEventArgs) {
    defer event.Close()
    fmt.Println("Recognized:", event.Result.Text)
}

func cancelledHandler(event speech.SpeechRecognitionCanceledEventArgs) {
    defer event.Close()
    fmt.Println("Received a cancellation: ", event.ErrorDetails)
}

func main() {
    subscription :=  "<paste-your-speech-key-here>"
    region := "<paste-your-speech-location/region-here>"

    audioConfig, err := audio.NewAudioConfigFromDefaultMicrophoneInput()
    if err != nil {
        fmt.Println("Got an error: ", err)
        return
    }
    defer audioConfig.Close()
    config, err := speech.NewSpeechConfigFromSubscription(subscription, region)
    if err != nil {
        fmt.Println("Got an error: ", err)
        return
    }
    defer config.Close()
    speechRecognizer, err := speech.NewSpeechRecognizerFromConfig(config, audioConfig)
    if err != nil {
        fmt.Println("Got an error: ", err)
        return
    }

    defer speechRecognizer.Close()
    speechRecognizer.SessionStarted(sessionStartedHandler)
    speechRecognizer.SessionStopped(sessionStoppedHandler)
    speechRecognizer.Recognizing(recognizingHandler)
    speechRecognizer.Recognized(recognizedHandler)
    speechRecognizer.Canceled(cancelledHandler)

    // 注意这里!
    speechRecognizer.StartContinuousRecognitionAsync()
    defer speechRecognizer.StopContinuousRecognitionAsync()

    bufio.NewReader(os.Stdin).ReadBytes('\n')
}

有2行代码是罪魁祸首

speechRecognizer.StartContinuousRecognitionAsync()
defer speechRecognizer.StopContinuousRecognitionAsync()

可以看到,没有对这两个方法返回的chan进行处理,就会导致goroutine的泄露,也刚好我们项目的开发就是参考的这个demo。

于是,就给微软官方进行了沟通,微软工程师认为不会存在泄露。
对于这个demo,微软工程师认为不会泄露的原因可能是因为其写在main中,main执行完后内存就全部被释放了,哪来的泄露
而,我们的程序是常驻进程,使用的这段代码是写在某段 grpc-go stream 的逻辑中,执行完之后程序不会退出,就导致了泄露。

但是,个人认为:正是因为这段代码写在main方法中,在其退出之后内存被回收从而掩盖了泄露问题,但是究其根源泄露还是存在的。
你觉得呢?

评论啦~