代码之家  ›  专栏  ›  技术社区  ›  lHumanizado

Golang defer并不总是执行

  •  -1
  • lHumanizado  · 技术社区  · 2 年前

    我正在运行一个Golang TCP服务器,它接收一个连接,生成一个goroutine来处理该连接,然后生成一个新的goroutine从连接中读取数据并将数据写入通道。还有一个从通道读取数据的简单功能。

    其想法是,如果客户端关闭连接 从连接读取 goroutine将在读取时收到一个错误,然后返回。defer代码将写入 完成 通道并关闭两个 完成 队列 通道。如果 完成 通道有数据,并返回当时的任何结果。

    另一种情况是,消费者可以决定是否有足够的数据来做出决定并返回。这将导致执行返回到 手柄 作用当该函数返回时 手柄 函数会关闭连接,导致 从连接读取 goroutine在从连接读取时接收错误,并关闭所有挂起的通道。

    这很好,但我正在进行一些负载测试,并注意到测试完成后Golang服务器的内存使用量并没有像负载停止时应用程序的所有其他部分那样减少。我拿了一些身份证并查看了日志。我明白了 有时 (并非一直如此) 从CONN读取时出错 显示消息,但没有 READ_FROM_CONN DEFER log,所以我认为defer永远不会被调用。因此 处理队列 挂起通道的读取,因为它从未关闭,并且 正在关闭。。。 的日志 手柄 函数也丢失了。

    至少,我认为这就是正在发生的事情,我相信这就是为什么当负载测试结束时,内存消耗永远不会下降的原因,因为代码仍在运行一些goroutines,从应该由defer代码关闭的通道中读取 从连接读取 。这种行为是不可预测的;并不是所有的连接都会发生这种情况,所以我不知道会出什么问题。

    这是我的Golang服务器的简化版本:

    package main
    
    import (
      "os"
      "net"
      "fmt"
      "io"
    )
    
    type CustomStruct struct {
      Type string
      Stop bool
    }
    
    func main() {
      // Creates server
      server, err := net.Listen("tcp", "0.0.0.0:80")
      if err != nil {
        fmt.Println("failed to bind listener to socket", err)
      }
      defer server.Close()
      fmt.Println("Listening new connections V2")
    
      // Starts reading from the server
      for {
        conn, err := server.Accept()
        if err != nil {
          fmt.Println("failed to accept new connection:", err)
          continue
        }
    
        go Handle(conn)
      }
    }
    
    func Handle(conn net.Conn) {
      defer conn.Close()
    
      id := "some uuid for each conn"
    
      // Creates channels
      queue := make(chan []byte, 512)
      done := make(chan bool)
    
      // Starts reading from the server
      go ReadFromConn(id, conn, queue, done)
    
      result := ProcessQueue(id, queue, done)
    
      fmt.Println(id, "CLOSING...")
    
      // Do stuffs with result...
      fmt.Println(id, result)
    }
    
    func ReadFromConn(
      id string,
      conn io.Reader,
      queue chan []byte,
      done chan bool,
    ) {
      defer func() {
        done <- true
        close(queue)
        close(done)
        fmt.Println(id, "READ_FROM_CONN DEFER")
      }()
    
      tmp := make([]byte, 256)
      for {
        _, err := conn.Read(tmp)
        if err != nil {
          fmt.Println(id, "ERROR READING FROM CONN " + err.Error())
          return
        }
        if (tmp[0] == 0x00) {
          return
        }
        queue <- tmp
      }
    }
    
    func ProcessQueue(
      id string,
      queue chan []byte,
      done chan bool,
    ) CustomStruct {
      defer fmt.Println(id, "GET_TRANSCRIPTION_RESULT ENDED")
      fmt.Println(id, "GET_TRANSCRIPTION_RESULT STARTED")
    
      result := CustomStruct{
        Type: "transcription",
        Stop: false,
      }
    
      for {
        select {
        case <-done:
          fmt.Println(id, "DONE DETECTED")
          return result
        default:
          fmt.Println(id, "DEFAULT")
          payload, open := <-queue
          if open == false {
            fmt.Println(id, "QUEUE IS CLOSED")
            return result
          } else {
            fmt.Println(id, "QUEUE IS OPEN")
          }
          // ... Do stuffs with payload, if certain condition is met, of the result of processing payload, return
          if (payload[0] == 0x01) {
            return result
          }
        }
      }
    
      return result
    }
    
    0 回复  |  直到 2 年前
        1
  •  1
  •   flakes    2 年前

    不完全确定问题是什么,但使用上下文对象可以更干净地控制阅读器的寿命。上下文避免了必须密切管理通道对象,并提供了一种干净的方法来报告goroutine中的错误(如果需要),使用 context.Cause(ctx) .

    示例设置可能如下所示:

    ctx, cancel := context.WithCancelCause(context.Background())
    queue := make(chan []byte, 512)
    go ReadFromConn(id, conn, queue, ctx, cancel)
    result := ProcessQueue(id, queue, ctx, cancel)
    

    defer调用可能是在上下文原因中放置一个错误,从而导致 ctx.Done 方法,现在返回一个存在值的通道。这种方法也是可重复的,这意味着当使用简单通道方法时,多个goroutine都可以接收一个值,而不是仅接收一个。

    发送到队列时,可以选择 ctx。多恩 以及防止在不再被读取的队列上永远等待。类似地,您也可以对从队列中读取执行此操作。

    func ReadFromConn(...) {
      defer cancel(fmt.Errorf("reader defer"))
      ...
      for {
        ...
        select {
        case queue <- tmp:
        case <-ctx.Done():
            return
        }
      }
    }
    
    func ProcessQueue(...) {
      defer cancel(fmt.Errorf("queue defer"))
      ...
      for {
        select {
        case <-ctx.Done():
          ...
        case payload := <-queue:
          ...
        }
      }
    
    推荐文章