• 主页
  • 架构
  • 编程语言
  • 数据存储
  • 网络
  • VMware
  • 服务器
  • 组网
  • AI
  • 算法系列
  • 设计模式
  • 读书笔记
  • 思考
  • 工具
  • 其它技术

  • 主页
  • 架构
  • 编程语言
  • 数据存储
  • 网络
  • VMware
  • 服务器
  • 组网
  • AI
  • 算法系列
  • 设计模式
  • 读书笔记
  • 思考
  • 工具
  • 其它技术

go语法大赏

2025-05-18

前些日子单机房稳定性下降,找了好一会才找到真正的原因。这里面涉及到不少go语法细节,正好大家一起看一下。

一、仿真代码

这是仿真之后的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package main

import (
"fmt"
"go.uber.org/atomic"
"time"
)

type StopSignal struct{}

// RecvChannel is the wrapped channel for recv side.
type RecvChannel[T any] struct {
// Data will be passed through the result channel.
DataChannel <-chan T
// Error will be passed through the error channel.
ErrorChannel <-chan error
// Stop signal will be passed through the stop signal channel,
// when signal is sent or channel is closed, it means recv side requires send side to stop sending data.
StopChannel chan<- StopSignal
stopped *atomic.Bool
}

// Close sends stop signal to the sender side.
func (c *RecvChannel[T]) Close() {
if !c.stopped.CompareAndSwap(false, true) {
return
}
close(c.StopChannel)
}

// Stopped returns whether the stop signal has been sent.
func (c *RecvChannel[T]) Stopped() bool {
return c.stopped.Load()
}

// GetError returns the last error, it waits at most 1s if the error channel is not closed.
func (c *RecvChannel[T]) GetError() error {
select {
case err := <-c.ErrorChannel:
return err
case <-time.After(time.Second):
return nil
}
}

// SendChannel is the wrapped channel for sender side.
type SendChannel[T any] struct {
// Data will be passed through the result channel.
DataChannel chan<- T
// Error will be passed through the error channel.
ErrorChannel chan<- error
// Stop signal will be passed through the stop signal channel,
// when signal is sent or channel is closed, it means recv side requires send side to stop sending data.
StopChannel <-chan StopSignal
stopped *atomic.Bool
}

// Close closes the result channel and error channel, so the recv will know the sending has been stopped.
func (c *SendChannel[T]) Close() {
close(c.DataChannel)
close(c.ErrorChannel)
c.stopped = atomic.NewBool(true)
}

// Stopped returns whether the stop signal has been sent.
func (c *SendChannel[T]) Stopped() bool {
return c.stopped.Load()
}

// Publish sends data to the data channel, does nothing if it is closed.
func (c *SendChannel[T]) Publish(t T) {
if c.Stopped() {
return
}
select {
case <-c.StopChannel:
case c.DataChannel <- t:
}
}

func (c *SendChannel[T]) PublishError(err error, close bool) {
if c.Stopped() {
return
}
select {
case <-c.StopChannel:
case c.ErrorChannel <- err:
}
if close {
c.Close()
}
}

func NewChannel[T any](bufSize int) (*SendChannel[T], *RecvChannel[T]) {
resultC := make(chan T, bufSize)
errC := make(chan error, 1)
stopC := make(chan StopSignal, 1)
stopped := atomic.NewBool(false)
sc := &SendChannel[T]{
DataChannel: resultC,
ErrorChannel: errC,
StopChannel: stopC,
stopped: stopped,
}
rc := &RecvChannel[T]{
DataChannel: resultC,
ErrorChannel: errC,
StopChannel: stopC,
stopped: stopped,
}

return sc, rc
}

// SliceToChannel creates a channel and sends the slice's items into it.
// It ignores if the item in the slices is not a type T or error.
func SliceToChannel[T any](size int, s []any) *RecvChannel[T] {
sc, rc := NewChannel[T](size)

go func() {
for _, item := range s {
if sc.Stopped() {
sc.Close()
return
}
switch v := item.(type) {
case T:
sc.DataChannel <- v
case error:
sc.ErrorChannel <- v
default:
continue
}

}
sc.Close()
}()

return rc
}

// /////////////// 真正的处理逻辑
func Process(send *SendChannel[int]) {
defer func() {
if send != nil {
fmt.Println("3 Process close defer")
send.Close()
}
}()
go func() {
for {
select {
case <-send.StopChannel:
fmt.Println("2 Process stop channel")
send.Close()
return
}
}
}()
send.ErrorChannel <- fmt.Errorf("0 Start error \n")
fmt.Println("0 Start error")
time.Sleep(1 * time.Second)
}

func main() {
send, recv := NewChannel[int](10)
go func() {
Process(send)
}()
for {
fmt.Println("only once")
select {
case <-recv.ErrorChannel:
fmt.Println("1 recv errorchannel ")
recv.Close()
break
}
break
}

//panic(1)
time.Sleep(5 * time.Second)
}

执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
➜  my go run main.go
only once
0 Start error
1 recv errorchannel
2 Process stop channel
3 Process close defer
panic: close of closed channel

goroutine 21 [running]:
main.(*SendChannel[...]).Close(...)
/Users/bytedance/My/work/go/my/main.go:60
main.Process.func1()
/Users/bytedance/My/work/go/my/main.go:147 +0x6c
main.Process(0x14000092020)
/Users/bytedance/My/work/go/my/main.go:163 +0x118
main.main.func1()
/Users/bytedance/My/work/go/my/main.go:168 +0x20
created by main.main in goroutine 1
/Users/bytedance/My/work/go/my/main.go:167 +0x70
exit status 2

不知道大家是否能够比较快的看出来问题。

二、相关语法

2.1channel

知识点

在 Go 语言中,channel是用于在多个goroutine之间进行通信和同步的重要机制,以下是一些关于channel的重要知识点:

1. 基本概念
  • 定义:channel可以被看作是一个类型安全的管道,用于在goroutine之间传递数据,遵循 CSP(Communicating Sequential Processes)模型,即 “通过通信来共享内存,而不是通过共享内存来通信”,从而避免了传统共享内存并发编程中的数据竞争等问题。
  • 声明与创建:使用make函数创建,语法为make(chan 数据类型, 缓冲大小)。缓冲大小是可选参数,省略时创建的是无缓冲channel;指定大于 0 的缓冲大小时创建的是有缓冲channel。例如:
1
2
unbufferedChan := make(chan int)      // 无缓冲channel
bufferedChan := make(chan int, 10) // 有缓冲channel,缓冲大小为10
2. 操作方式
  • 发送数据:使用<-操作符将数据发送到channel中,语法为channel <- 数据。例如:
1
2
3
4
ch := make(chan int)
go func() {
ch <- 42 // 发送数据42到ch中
}()
  • 接收数据:同样使用<-操作符从channel中接收数据,有两种形式。一种是将接收到的数据赋值给变量,如数据 := <-channel;另一种是只接收数据不赋值,如<-channel。例如:
1
2
3
4
5
ch := make(chan int)
go func() {
ch <- 42
}()
value := <-ch // 从ch中接收数据并赋值给value
  • **关闭channel**:使用内置的close函数关闭channel,关闭后不能再向其发送数据,但可以继续接收已发送的数据。接收完所有数据后,再接收将得到该类型的零值。例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
ch := make(chan int)
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 关闭channel
}()
for {
value, ok := <-ch
if!ok {
break // 当ok为false时,表示channel已关闭
}
fmt.Println(value)
}
3. 缓冲与非缓冲channel
  • **无缓冲channel**:也叫同步channel,数据的发送和接收必须同时准备好,即发送操作和接收操作会互相阻塞,直到对方准备好。只有当有对应的接收者在等待时,发送者才能发送数据;反之,只有当有发送者发送数据时,接收者才能接收数据。这确保了数据的同步传递。
  • **有缓冲channel**:内部有一个缓冲区,只要缓冲区未满,发送操作就不会阻塞;只要缓冲区不为空,接收操作就不会阻塞。当缓冲区满时,继续发送会阻塞;当缓冲区为空时,继续接收会阻塞。例如:
1
2
3
4
5
6
bufferedChan := make(chan int, 3)
bufferedChan <- 1
bufferedChan <- 2
bufferedChan <- 3
// 此时缓冲区已满,再发送会阻塞
// bufferedChan <- 4
4. 单向channel
  • 单向channel只能用于发送或接收数据,分别为只写channel(chan<- 数据类型)和只读channel(<-chan 数据类型)。单向channel主要用于函数参数传递,限制channel的使用方向,增强代码的可读性和安全性。例如:
1
2
3
4
5
6
7
8
9
10
// 只写channel
func sendData(ch chan<- int) {
ch <- 42
}

// 只读channel
func receiveData(ch <-chan int) {
data := <-ch
fmt.Println(data)
}
5. select语句与channel
  • select语句用于监听多个channel的操作,它可以同时等待多个channel的发送或接收操作。当有多个channel准备好时,select会随机选择一个执行。select语句还可以结合default分支实现非阻塞操作。例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ch1 := make(chan int)
ch2 := make(chan int)

go func() {
ch1 <- 1
}()

select {
case data := <-ch1:
fmt.Println("Received from ch1:", data)
case data := <-ch2:
fmt.Println("Received from ch2:", data)
default:
fmt.Println("No channel is ready")
}
6. channel的阻塞与死锁
  • 阻塞:发送和接收操作在channel未准备好时会阻塞当前goroutine。无缓冲channel在没有对应的接收者时发送会阻塞,没有发送者时接收会阻塞;有缓冲channel在缓冲区满时发送会阻塞,缓冲区空时接收会阻塞。
  • 死锁:如果在一个goroutine中,channel的发送和接收操作相互等待,且没有其他goroutine来打破这种等待,就会发生死锁。例如,一个goroutine向无缓冲channel发送数据,但没有其他goroutine接收;或者一个goroutine从无缓冲channel接收数据,但没有其他goroutine发送数据。运行时系统会检测到死锁并报错。
7. channel的底层实现
  • channel的底层实现基于一个名为hchan的结构体,它包含了当前队列中元素数量、环形队列大小(缓冲容量)、指向环形队列的指针、元素大小、关闭标志、元素类型信息、发送索引、接收索引、等待接收的协程队列、等待发送的协程队列以及一个互斥锁等字段。
  • 发送操作时,如果接收队列非空,直接将数据拷贝给第一个等待的接收者并唤醒该goroutine;如果缓冲区未满,将数据存入缓冲区;如果缓冲区已满或无缓冲channel,将当前goroutine加入发送队列并挂起。接收操作时,如果发送队列非空,直接从发送者获取数据并唤醒发送者;如果缓冲区不为空,从缓冲区取出数据;如果缓冲区为空且无缓冲channel,将当前goroutine加入接收队列并挂起。
8. channel误用导致的问题

在 Go 语言中,操作channel时可能导致panic或者死锁等:

1.多次关闭同一个channel

使用内置的close函数关闭channel后,如果再次调用close函数尝试关闭同一个channel,就会引发panic。这是因为channel的关闭状态是一种不可逆的操作,重复关闭没有实际意义,并且可能会导致难以调试的问题。例如:

1
2
3
ch := make(chan int)
close(ch)
close(ch) // 这里会导致panic

2.向已关闭的channel发送数据

当一个channel被关闭后,再向其发送数据会导致panic。因为关闭channel意味着不再有数据会被发送到该channel中,继续发送数据违反了这种约定。示例如下:

1
2
3
ch := make(chan int)
close(ch)
ch <- 1 // 向已关闭的channel发送数据,会导致panic

3.关闭未初始化(nil)的channel

如果尝试关闭一个值为nil的channel,会引发panic。nil的channel没有实际的底层数据结构来支持关闭操作。例如:

1
2
var ch chan int
close(ch) // 这里会导致panic,因为ch是nil

4.死锁导致的panic

在操作channel时,如果多个goroutine之间的通信和同步设计不当,可能会导致死锁。死锁发生时,所有涉及的goroutine都在互相等待对方,从而导致程序无法继续执行,运行时系统会检测到这种情况。例如:

1
2
3
4
5
func main() {
ch := make(chan int)
ch <- 1 // 没有其他goroutine从ch中接收数据,这里会阻塞,导致死锁
fmt.Println("This line will never be executed")
}
1
2
3
4
5
6
7
➜  my go run main.go
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
/Users/bytedance/My/work/go/my/main.go:172 +0x54
exit status 2

5.不恰当的select语句使用

在select语句中,如果没有default分支,并且所有的case对应的channel操作都无法立即执行(阻塞),那么当前goroutine会被阻塞。如果在主goroutine中发生这种情况且没有其他goroutine可以运行,就会导致死锁。例如:

1
2
3
4
5
6
7
8
9
10
11
func main() {
ch1 := make(chan int)
ch2 := make(chan int)

select {
case <-ch1:
// 没有数据发送到ch1,这里会阻塞
case <-ch2:
// 没有数据发送到ch2,这里会阻塞
}
}

要避免这些panic情况,编写代码时需要仔细设计channel的使用逻辑,合理处理channel的关闭、数据的发送和接收,以及确保goroutine之间的同步和通信正确无误。

解析

在NewChannel函数中,send和recv channel被赋值的是同一个ErrorChannel,而send和recv都是单向channel,一个只写,一个只读。

image-20250518214159352

image-20250518214217905

image-20250518214245810

所以当Process里send.ErrorChannel <- fmt.Errorf(“0 Start error \n”)执行的时候,main中的case <-recv.ErrorChannel被立即触发,然后执行recv.Close()函数,该函数执行了close(c.StopChannel),又触发了Process中的case <-send.StopChannel,执行了send.Close()。对于Process退出的时候,有defer,再次执行send.Close(),导致channel被多次关闭。

2.2defer

知识点

以前写过Go defer的一些神奇规则,你了解吗?,这次主要关注

  1. defer(延迟函数)执行按后进先出顺序执行,即先出现的 defer最后执行。
  2. Process中的defer的执行顺序与Process中的goroutine里的defer(如果有的话)执行顺序无关。

解析

其实这两个Close位置都有可能panic,主要看谁被先执行到。我是为了演示让Process sleep了1s。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
defer func() {
if send != nil {
fmt.Println("3 Process close defer")
send.Close()
}
}()
go func() {
for {
select {
case <-send.StopChannel:
fmt.Println("2 Process stop channel")
send.Close()
return
}
}
}()

2.3recover

知识点

在 Go 语言中,recover只能用于捕获当前goroutine内的panic,它的作用范围仅限于当前goroutine。具体说明如下:

**只能捕获当前goroutine的panic**:当一个goroutine发生panic时,该goroutine会沿着调用栈向上展开,执行所有已注册的defer函数。如果在这些defer函数中调用recover,则可以捕获到该goroutine内的panic,并恢复正常执行流程。而对于其他goroutine中发生的panic,当前goroutine无法通过recover捕获。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"
"time"
)

func worker() {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered in worker:", r)
}
}()
panic("Worker panicked")
}

func main() {
go worker()
time.Sleep(1 * time.Second)
fmt.Println("Main goroutine continues")
}

在上述代码中,worker函数中的defer语句里使用recover捕获了该goroutine内的panic。main函数中的goroutine并不会受到影响,继续执行并打印出 “Main goroutine continues”。

解析

当时之所以查的比较困难,主要是发现Process中go func里配置了recover,报了很多错,但感觉没有大问题。加上代码不熟悉,没有发现有概率触发Process的defer中的panic。而且公司的监控没有监控到自建goroutine的panic情况。

三、解决方案

在Process中添加recover

1
2
3
4
5
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered in worker:", r)
}
}()

其实比较建议在涉及channel相关的地方,都加个recover,尤其是不太熟悉的时候。

扫一扫,分享到微信

微信分享二维码
MySQL事务的一些奇奇怪怪知识
© 2025 John Doe
Hexo Theme Yilia by Litten