我正在运行一个Golang TCP服务器,它接收一个连接,生成一个goroutine来处理该连接,然后生成一个新的goroutine从连接中读取数据并将数据写入通道。还有一个从通道读取数据的简单功能。
其想法是,如果客户端关闭连接
从连接读取
goroutine将在读取时收到一个错误,然后返回。defer代码将写入
完成
通道并关闭两个
完成
和
队列
通道。如果
完成
通道有数据,并返回当时的任何结果。
另一种情况是,消费者可以决定是否有足够的数据来做出决定并返回。这将导致执行返回到
手柄
作用当该函数返回时
手柄
函数会关闭连接,导致
从连接读取
goroutine在从连接读取时接收错误,并关闭所有挂起的通道。
这很好,但我正在进行一些负载测试,并注意到测试完成后Golang服务器的内存使用量并没有像负载停止时应用程序的所有其他部分那样减少。我拿了一些身份证并查看了日志。我明白了
有时
(并非一直如此)
从CONN读取时出错
显示消息,但没有
READ_FROM_CONN DEFER
log,所以我认为defer永远不会被调用。因此
处理队列
挂起通道的读取,因为它从未关闭,并且
正在关闭。。。
的日志
手柄
函数也丢失了。
至少,我认为这就是正在发生的事情,我相信这就是为什么当负载测试结束时,内存消耗永远不会下降的原因,因为代码仍在运行一些goroutines,从应该由defer代码关闭的通道中读取
从连接读取
。这种行为是不可预测的;并不是所有的连接都会发生这种情况,所以我不知道会出什么问题。
这是我的Golang服务器的简化版本:
package main
import (
"os"
"net"
"fmt"
"io"
)
type CustomStruct struct {
Type string
Stop bool
}
func main() {
// Creates server
server, err := net.Listen("tcp", "0.0.0.0:80")
if err != nil {
fmt.Println("failed to bind listener to socket", err)
}
defer server.Close()
fmt.Println("Listening new connections V2")
// Starts reading from the server
for {
conn, err := server.Accept()
if err != nil {
fmt.Println("failed to accept new connection:", err)
continue
}
go Handle(conn)
}
}
func Handle(conn net.Conn) {
defer conn.Close()
id := "some uuid for each conn"
// Creates channels
queue := make(chan []byte, 512)
done := make(chan bool)
// Starts reading from the server
go ReadFromConn(id, conn, queue, done)
result := ProcessQueue(id, queue, done)
fmt.Println(id, "CLOSING...")
// Do stuffs with result...
fmt.Println(id, result)
}
func ReadFromConn(
id string,
conn io.Reader,
queue chan []byte,
done chan bool,
) {
defer func() {
done <- true
close(queue)
close(done)
fmt.Println(id, "READ_FROM_CONN DEFER")
}()
tmp := make([]byte, 256)
for {
_, err := conn.Read(tmp)
if err != nil {
fmt.Println(id, "ERROR READING FROM CONN " + err.Error())
return
}
if (tmp[0] == 0x00) {
return
}
queue <- tmp
}
}
func ProcessQueue(
id string,
queue chan []byte,
done chan bool,
) CustomStruct {
defer fmt.Println(id, "GET_TRANSCRIPTION_RESULT ENDED")
fmt.Println(id, "GET_TRANSCRIPTION_RESULT STARTED")
result := CustomStruct{
Type: "transcription",
Stop: false,
}
for {
select {
case <-done:
fmt.Println(id, "DONE DETECTED")
return result
default:
fmt.Println(id, "DEFAULT")
payload, open := <-queue
if open == false {
fmt.Println(id, "QUEUE IS CLOSED")
return result
} else {
fmt.Println(id, "QUEUE IS OPEN")
}
// ... Do stuffs with payload, if certain condition is met, of the result of processing payload, return
if (payload[0] == 0x01) {
return result
}
}
}
return result
}