channel:从理解源码到应用
“不要通过共享内存来通信,而要通过通信来实现内存共享”
channel 的底层就是通过 mutex 来控制并发的。只是 channel 是更高一层次的并发编程原语,封装了更多的功能。
使用原子函数、读写锁可以保证资源的共享访问安全,但使用 channel 更优雅
lock
用来保证每个读 channel 或写 channel 的操作都是原子的
为什么要channel
Go 通过 channel 实现 CSP 通信模型,主要用于 goroutine 之间的消息传递和事件通知。
有了 channel 和 goroutine 之后,Go 的并发编程变得异常容易和安全,得以让程序员把注意力留到业务上去,实现开发效率的提升。
数据结构
type hchan struct {
qcount uint // total data in the queue 当前队列(chan)中的元素个数
dataqsiz uint // size of the circular queue 环形队列容量
buf unsafe.Pointer // points to an array of dataqsiz elements 指针,指向底层环形队列存储(数组)
elemsize uint16 // chan中元素大小 // c.elemsize = uint16(elem.size)
closed uint32 // 是否关闭
elemtype *_type // element type channel容器内的元素类型
sendx uint // send index // 已发送元素在循环数组中的索引
recvx uint // receive index // 已接收元素在循环数组中的索引
recvq waitq // list of recv waiters // 等待接收的 goroutine 队列(双向链表)
sendq waitq // list of send waiters // 等待发送的 goroutine 队列(双向链表)
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex // 锁
}
解释:
buf
指向底层数组,缓冲型channel
会分配数组,非缓冲型数组则不分配。
sendx
,recvx
指向底层循环数组下标,表示当前可以发送和接受元素索引值,也就是环形队列中的队首和队尾。
waitq
是元素为sudog
的一个双向链表,而sudog
是对goroutine
的一个封装
type waitq struct {
first *sudog
last *sudog
}
lock
用来保证每个读channel
或写channel
的操作都是原子的。
例如,现在有一个容量为 6 的,元素为 int 型的 channel ,其数据结构如下 :
解释:
qcount(4)
表示队列中有4个元素
dataqsiz(6)
表示队列的容量为6
buf
指向底层数组(环形队列)
elemsize(8)
表示channel容器中的元素类型大小。比如对于chan int
,其值为8
// var x int = 100
// fmt.Println(unsafe.Sizeof(x)) // 8 单位bit
创建channel
channel按是否带缓冲分类
Channel 分为两种:带缓冲、不带缓冲。对不带缓冲的 channel 进行的操作实际上可以看作“同步模式”,带缓冲的则称为“异步模式”。
不带缓冲的channel(同步模式)
func TestChannelBlocking(t *testing.T) {
var ch = make(chan struct{})
// 这个时候没有消费者就绪,此处会阻塞
//ch <- struct{}{} // fatal error: all goroutines are asleep - deadlock!
wg.Add(1)
// go 一个消费者
go func(ch chan struct{}) {
<-ch
wg.Done()
}(ch)
// 生产者生成消息
ch <- struct{}{}
wg.Wait()
}
同步模式下,发送方和接收方要同步就绪,只有在两者都 ready 的情况下,数据才能在两者间传输(后面会看到,实际上就是内存拷贝)。否则,任意一方先行进行发送或接收操作,都会被挂起,等待另一方的出现才能被唤醒。
带缓冲的channel(异步模式)
func TestChannelNonBlocking(t *testing.T) {
var ch = make(chan struct{}, 2) // 可以缓冲2个元素
// 生产者生成消息
ch <- struct{}{}
wg.Add(1)
// go 一个消费者
go func(ch chan struct{}) {
<-ch
wg.Done()
}(ch)
// 生产者生成消息
ch <- struct{}{}
wg.Wait()
}
异步模式下,在缓冲槽可用的情况下(有剩余容量),发送和接收操作都可以顺利进行。否则,操作的一方(如写入)同样会被挂起,直到出现相反操作(如接收)才会被唤醒。
小结一下:同步模式下,必须要使发送方和接收方配对,操作才会成功,否则会被阻塞;异步模式下,缓冲槽要有剩余容量,操作才会成功,否则也会被阻塞。
channel按读写分类
只读channel
func goroutineA(a <-chan int) {
val := <-a
fmt.Println("G1 received data: ", val)
wg.Done()
return
}
func goroutineB(b <-chan int) {
// b <- 10 这是一个错误的操作,因为b是一个只读channel,也就是说对于b来说没有写权限
val := <-b
fmt.Println("G2 received data: ", val)
wg.Done()
return
}
func TestChannelRecv(t *testing.T) {
ch := make(chan int)
go goroutineA(ch)
go goroutineB(ch)
wg.Add(1)
ch <- 3
wg.Wait()
}
对于<-chan
类型来说,表示这个chan只有读权限
只写channel
func goroutineC(a chan<- int) {
a <- 1
fmt.Println("C send data: 1")
return
}
func goroutineD(a <-chan int) {
val := <-a
fmt.Println("D received data from C: ", val)
wg.Done()
return
}
func TestChannelSend(t *testing.T) {
ch := make(chan int)
wg.Add(1)
go goroutineD(ch)
go goroutineC(ch)
wg.Wait()
}
小结一下:我们可以通过设置channel
的读写权限来明确每个函数的身份(是消费者还是生产者,或者既是生产者又是消费者),同时可以有效避免只读成员发生误写操作、只写成员消费数据。
创建一个channel
我们可以通过make
来创建一个channel,如var ch = make(chan int)
。那么使用make
创建一个channel都发生了什么呢?
func main() {
makeChan()
}
func makeChan() {
_ = make(chan int)
}
// CALL runtime.makechan(SB)
通过查看汇编代码发现,是通过调用makechan
来创建channel的,我们来看看makechan
长什么样子吧。
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 { // 从这里我们得到信息:channel最大容量为((1<<16) - 1)
throw("makechan: invalid channel element type")
}
// 检查对齐字节数
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 获取要分配的内存
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero. // 无缓冲类型
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size) // chan的大小
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
从makechan
中我们得到下面信息:
chan支持最大容量为
(1<<16) - 1
对于无缓冲类型chan,不会去申请buf数组
我们传入的size(chan的容量),最终决定底层环形数组的长度
接收
接收值函数重载
从一个channel中获取值有两种形式
var x = <-ch // 当接收到相应类型的零值时无法知道是真实的发送者发送过来的值,还是 channel 被关闭后,返回给接收者的默认类型的零值
var xx, ok = <- ch // 如果ok == false,表示channel已经被关闭。
// 可能看到这里会有疑惑,为什么ok == false就说明channel是被关闭的?对于接收值函数的实现,传入的block参数值为true,就是说不会命中需要满足!block的分支,此外从其他分支看,对于已关闭的channel该值为false,对于未关闭的channel该值为true。因此在recv场景下,我们可以任务ok==false时,表示channel已经被关闭了。
func goroutineE(a chan<- int) {
a <- 1
fmt.Println("E send data: 1")
close(a)
return
}
func goroutineF(a <-chan int) {
var ok bool
val := <-a
fmt.Printf("F received data from E %d, ok: %v \n", val, ok)
val, ok = <-a
fmt.Printf("F received data from E %d, ok: %v \n", val, ok)
wg.Done()
return
}
func TestChannelClose(t *testing.T) {
ch := make(chan int)
wg.Add(1)
go goroutineE(ch)
go goroutineF(ch)
wg.Wait()
}
/*
=== RUN TestChannelClose
E send data: 1
F received data from E 1, ok: false
F received data from E 0, ok: false
--- PASS: TestChannelClose (0.00s)
PASS
*/
通过反汇编处理,发现分别调用的是下面两种函数
// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
其实都是调用chanrecv
// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil,说明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
// 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block { // 非阻塞
return // (false,false)
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回
// 当我们观察到 channel 没准备好接收:
// 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待
// 2. 缓冲型,但 buf 里没有元素
// 之后,又观察到 closed == 0,即 channel 未关闭。
// 因为 channel 不可能被重复打开,所以前一个观测的时候 channel 也是未关闭的,
// 因此在这种情况下可以直接宣布接收失败,返回 (false, false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// channel 已关闭,并且循环数组 buf 里没有元素
// 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
// 也就是说即使是关闭状态,但在缓冲型的 channel,
// buf 里有元素的情况下还能接收到元素
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
// 从一个已关闭的 channel 执行接收操作,且未忽略返回值
// 那么接收的值将是一个该类型的零值
// typedmemclr 根据类型清理相应地址的内存
if ep != nil {
typedmemclr(c.elemtype, ep)
}
// 从一个已关闭的 channel 接收,selected 会返回true
return true, false
}
// 等待发送队列里有 goroutine 存在,说明 buf 是满的
// 这有可能是:
// 1. 非缓冲型的 channel
// 2. 缓冲型的 channel,但 buf 满了
// 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
// 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 缓冲型,buf 里有元素,可以正常接收
if c.qcount > 0 {
// Receive directly from queue
// 直接从循环数组里找到要接收的元素
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 代码里,没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清理掉循环数组里相应位置的值。
typedmemclr(c.elemtype, qp)
// 环形队列,队首游标左移
c.recvx++
// 队首游标溢出检查
if c.recvx == c.dataqsiz {
c.recvx = 0 // 置0
}
// 数组元素减1
c.qcount--
unlock(&c.lock)
// 从一个未关闭的channel接收到值
return true, true
}
if !block {
unlock(&c.lock)
// 非阻塞接收,selected 返回 false,因为没有接收到值
return false, false
}
// no sender available: block on this channel.
// 阻塞接收
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
// 写待接收是数据的地址
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 阻塞。进入等待接收队列
c.recvq.enqueue(mysg)
// 将当前goroutine挂起
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
// 被唤醒了
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
// 成功接收到数据(true),以及channel的开关状态(是否被关闭了)
return true, !closed
}
从recvchan
中我们得知:
在非阻塞模式下并且channel没有被关闭。如果是非缓冲型channel,等待发送列队 sendq 里没有 goroutine 在等待,又或者是缓冲型channel,但 buf 里没有元素。select将无法从中获得数据,selected == false,这里的
received
也为false
会是代表channel已经被关闭了?其实不是,那只是在recv
也就是<- ch
场景下我们可以这样认为,这也是为什么在上面解释的原因。// compiler implements // // select { // case v = <-c: // ... foo // default: // ... bar // } // // as // // if selectnbrecv(&v, c) { // ... foo // } else { // ... bar // } // func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) { selected, _ = chanrecv(c, elem, false) return } // compiler implements // // select { // case v, ok = <-c: // ... foo // default: // ... bar // } // // as // // if c != nil && selectnbrecv2(&v, &ok, c) { // ... foo // } else { // ... bar // } // func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) { // TODO(khr): just return 2 values from this function, now that it is in Go. selected, *received = chanrecv(c, elem, false) return } // block参数值为false,那么对一个nil chan进行select并不会像接收操作那样panic // 如果是一个 nil 的 channel if c == nil { // 如果不阻塞,直接返回 (false, false) if !block { return } // 否则,接收一个 nil 的 channel,goroutine 挂起 gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2) // 不会执行到这里 throw("unreachable") } func TestChannelSelectFromNil(t *testing.T) { var ch chan int var x int var ok bool var cnt = 12 var hit int for cnt > 0 { select { case x = <-ch: fmt.Println(x) case x, ok = <-ch: fmt.Println(x, ok) default: //fmt.Println("default") hit++ } cnt-- } fmt.Println(hit) // 12 }
channel已经关闭,并且里面已经没有元素,我们仍然可以从中获得元素(得到的是channel容器元素中类型的零值)
如果等待发送队列中有goroutine存在。如果是非缓冲型channel,那么直接将其拷贝到接收者的栈;如果是缓冲型channel,那么将弹出队首元素,将其添加到队尾。
如果缓冲型channel中有元素,那么对于"val <- ch"这种方式的接收则会通过
typedmemmove
进行拷贝,然后回收环形队列的队首。如果channel处于阻塞状态并且等待发送队列中不存在goroutine,那么当前go routine将会被挂起,直到有数据被send进channel而被唤醒。
发送
和分析<- ch
一样,我们通过查看汇编代码得知ch <-
最后调用的是chansend
函数。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
// 不能阻塞,直接返回 false,表示未发送成功
if !block {
return false
}
// 阻塞,当前 goroutine 被挂起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
// 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
// 2. channel 是缓冲型的,但循环数组已经装满了元素
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// 如果channel关闭了,向里面发送数据则会panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 如果等待接收队列里有goroutine,则直接将其拷贝给接收gouroutine
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 对于缓冲型channel,如果还有缓冲空间
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep) // 将数据从ep处拷贝到qp处
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true // 发送成功
}
// 如果是非阻塞模式下,则直接提示发送失败
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 当前gorutine进入等待发送队列
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
// 被唤醒,有机会发送了
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 被唤醒后,发现channel被关闭了,那么将panic
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil // 解除channel绑定信息
releaseSudog(mysg)
return true
}
发送数据会有以下情况:
如果channel为nil,在非阻塞模式下会提示发送失败(return false),在阻塞模式下当前goroutine则会被挂起
在非阻塞模式下,如果发送数据给非缓冲型channel或者是元素已满的缓冲型channel,则会提示发送失败。
如果当前channel已经被关闭,当尝试向其发送数据时会panic
如果等待接收队列里有goroutine,直接将要发送的数据拷贝到接收goroutine。(当等待接收队列里有goroutine的时候说明,该channel是非缓冲型channel或者是元素为空的缓冲型channel)
如果是缓冲型channel并且还有缓冲空间,那么将数据拷贝到buf数组中(环形队列),提示发送成功
如果是缓冲型channel并且已经满了,那么将被挂起,直到被唤醒。
关闭
关闭channel,会执行函数closechan
func closechan(c *hchan) {
// 关闭一个nil channel,panic
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// 关闭一个非打开状态的channel,panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
// 修改channel状态为 已关闭
c.closed = 1
var glist gList
// release all readers
// 将 channel 所有等待接收队列的里 sudog 释放
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
// 将 channel 等待发送队列里的 sudog 释放
// 如果存在,这些 goroutine 将会 panic
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
// 将goroutine唤醒
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
从代码得知:
如果我们关闭一个
nil channel
会panic如果我们关闭一个非打开状态的channel,会panic
成功关闭一个channel的话。如果等待接收队列里还有goroutine那么将会被释放;如果等待发送队列里还有goroutine将会被释放,只是这些goroutine都是因为panic而异常退出的。
channel异常总结
关闭已经关闭的channel
也会引发panic
channel的应用
停止信号
// 摘自go时间轮实现,通过stop channel传递stop信号
// Stop 停止时间轮
func (tw *wheel) Stop() {
tw.stop <- struct{}{}
}
// Scheduler 调度
func (tw *WheelWithHeap) scheduler() {
for {
select {
case <-tw.ticker.C:
tw.handleTick() // 处理延时任务
case aTask := <-tw.taskChannel.add:
tw.registerTask(aTask) // 注册延时任务
case rTask := <-tw.taskChannel.remove:
tw.collectTask(rTask) // 回收延时任务
case <-tw.stop:
tw.exited = true
tw.ticker.Stop()
return
}
}
}
任务定时
延时任务
select {
case <-time.After(100 * time.Millisecond):
case <-s.stopc:
return false
}
定期任务
func worker() {
ticker := time.Tick(1 * time.Second)
for {
select {
case <- ticker:
// 执行定时任务
fmt.Println("执行 1s 定时任务")
}
}
}
解耦生产方和消费方
func TestChannelUseWithWorker(t *testing.T) {
var ch = make(chan int, 100)
wg.Add(10)
for i := 0; i < 10; i++ {
ch <- i
}
go worker(ch)
wg.Wait()
}
func worker(ch <-chan int) {
// 启动 5 个工作协程
for i := 0; i < 5; i++ {
go func(id int) {
for {
select {
case task := <-ch:
fmt.Printf("finish task: %d by worker %d\n", task, id)
time.Sleep(time.Second)
wg.Done()
default:
}
}
}(i)
}
}
控制并发
// 参考https://github.com/1005281342/test_tools/blob/master/survey/func.go
func Func2(f func(interface{}), cnt int) func(int) {
return func(n int) {
ch := make(chan int, cnt)
for i, v := range RandShuffle(n) {
wg.Add(1)
go func(a int) {
ch <- i
defer func() {
// 避免f(a)异常退出导致阻塞
<-ch
wg.Done()
}()
f(a)
}(v)
}
for i := 0; i < cnt; i++ {
ch <- i
}
wg.Wait()
}
}
内存泄露
“通道可能会引发goroutine leak,确切地说,是指goroutine处于发送或接收阻塞状态,但一直未被唤醒。垃圾回收器并不收集此类资源,导致它们会在等待队列里长久休眠,形成资源泄漏。”
摘录来自: 雨痕. Go语言学习笔记。
func test() {
c:=make(chan int)
for i:=0;i<10;i++ {
go func() {
<-c
}()
}
}
func main() {
test()
for{
time.Sleep(time.Second)
runtime.GC() // 强制垃圾回收
}
}
// $go build-o test
// $GODEBUG="gctrace=1,schedtrace=1000,scheddetail=1" ./test
// 从监控结果可以看到大量goroutine一直处于chan receive状态,无法结束
参考
雨痕. Go语言学习笔记
最后更新于
这有帮助吗?