channel:从理解源码到应用

“不要通过共享内存来通信,而要通过通信来实现内存共享”

channel 的底层就是通过 mutex 来控制并发的。只是 channel 是更高一层次的并发编程原语,封装了更多的功能。

使用原子函数、读写锁可以保证资源的共享访问安全,但使用 channel 更优雅

lock 用来保证每个读 channel 或写 channel 的操作都是原子的

为什么要channel

Go 通过 channel 实现 CSP 通信模型,主要用于 goroutine 之间的消息传递和事件通知。

有了 channel 和 goroutine 之后,Go 的并发编程变得异常容易和安全,得以让程序员把注意力留到业务上去,实现开发效率的提升。

数据结构

type hchan struct {
    qcount   uint           // total data in the queue 当前队列(chan)中的元素个数
    dataqsiz uint           // size of the circular queue    环形队列容量
    buf      unsafe.Pointer // points to an array of dataqsiz elements 指针,指向底层环形队列存储(数组)
    elemsize uint16    // chan中元素大小 // c.elemsize = uint16(elem.size)
    closed   uint32    // 是否关闭
    elemtype *_type // element type channel容器内的元素类型
    sendx    uint   // send index    // 已发送元素在循环数组中的索引
    recvx    uint   // receive index // 已接收元素在循环数组中的索引
    recvq    waitq  // list of recv waiters // 等待接收的 goroutine 队列(双向链表)
    sendq    waitq  // list of send waiters // 等待发送的 goroutine 队列(双向链表)

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex    // 锁
}
img

解释:

buf指向底层数组,缓冲型channel会分配数组,非缓冲型数组则不分配。

sendxrecvx指向底层循环数组下标,表示当前可以发送和接受元素索引值,也就是环形队列中的队首和队尾。

waitq是元素为sudog的一个双向链表,而sudog是对goroutine的一个封装

type waitq struct {
    first *sudog
    last  *sudog
}

lock用来保证每个读channel或写channel的操作都是原子的。

例如,现在有一个容量为 6 的,元素为 int 型的 channel ,其数据结构如下 :

img

解释:

qcount(4)表示队列中有4个元素

dataqsiz(6)表示队列的容量为6

buf指向底层数组(环形队列)

elemsize(8)表示channel容器中的元素类型大小。比如对于chan int,其值为8

// var x int = 100
// fmt.Println(unsafe.Sizeof(x)) // 8 单位bit

创建channel

channel按是否带缓冲分类

Channel 分为两种:带缓冲、不带缓冲。对不带缓冲的 channel 进行的操作实际上可以看作“同步模式”,带缓冲的则称为“异步模式”。

不带缓冲的channel(同步模式)

func TestChannelBlocking(t *testing.T) {
    var ch = make(chan struct{})

    // 这个时候没有消费者就绪,此处会阻塞
    //ch <- struct{}{}    // fatal error: all goroutines are asleep - deadlock!

    wg.Add(1)
    // go 一个消费者
    go func(ch chan struct{}) {
        <-ch
        wg.Done()
    }(ch)

    // 生产者生成消息
    ch <- struct{}{}
    wg.Wait()
}

同步模式下,发送方和接收方要同步就绪,只有在两者都 ready 的情况下,数据才能在两者间传输(后面会看到,实际上就是内存拷贝)。否则,任意一方先行进行发送或接收操作,都会被挂起,等待另一方的出现才能被唤醒。

带缓冲的channel(异步模式)

func TestChannelNonBlocking(t *testing.T) {
    var ch = make(chan struct{}, 2) // 可以缓冲2个元素
    // 生产者生成消息
    ch <- struct{}{}

    wg.Add(1)
    // go 一个消费者
    go func(ch chan struct{}) {
        <-ch
        wg.Done()
    }(ch)

    // 生产者生成消息
    ch <- struct{}{}
    wg.Wait()
}

异步模式下,在缓冲槽可用的情况下(有剩余容量),发送和接收操作都可以顺利进行。否则,操作的一方(如写入)同样会被挂起,直到出现相反操作(如接收)才会被唤醒。

小结一下:同步模式下,必须要使发送方和接收方配对,操作才会成功,否则会被阻塞;异步模式下,缓冲槽要有剩余容量,操作才会成功,否则也会被阻塞。

channel按读写分类

只读channel

func goroutineA(a <-chan int) {
    val := <-a
    fmt.Println("G1 received data: ", val)
    wg.Done()
    return
}

func goroutineB(b <-chan int) {
    // b <- 10 这是一个错误的操作,因为b是一个只读channel,也就是说对于b来说没有写权限
    val := <-b
    fmt.Println("G2 received data: ", val)
    wg.Done()
    return
}

func TestChannelRecv(t *testing.T) {
    ch := make(chan int)
    go goroutineA(ch)
    go goroutineB(ch)
    wg.Add(1)
    ch <- 3
    wg.Wait()
}

对于<-chan类型来说,表示这个chan只有读权限

只写channel

func goroutineC(a chan<- int) {
    a <- 1
    fmt.Println("C send data: 1")
    return
}

func goroutineD(a <-chan int) {
    val := <-a
    fmt.Println("D received data from C: ", val)
    wg.Done()
    return
}

func TestChannelSend(t *testing.T) {
    ch := make(chan int)
    wg.Add(1)
    go goroutineD(ch)
    go goroutineC(ch)
    wg.Wait()
}

小结一下:我们可以通过设置channel的读写权限来明确每个函数的身份(是消费者还是生产者,或者既是生产者又是消费者),同时可以有效避免只读成员发生误写操作、只写成员消费数据。

创建一个channel

我们可以通过make来创建一个channel,如var ch = make(chan int)。那么使用make创建一个channel都发生了什么呢?

func main() {
    makeChan()
}

func makeChan()  {
    _ = make(chan int)
}
//  CALL    runtime.makechan(SB)

通过查看汇编代码发现,是通过调用makechan来创建channel的,我们来看看makechan长什么样子吧。

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
  if elem.size >= 1<<16 {    // 从这里我们得到信息:channel最大容量为((1<<16) - 1)
        throw("makechan: invalid channel element type")
    }

  // 检查对齐字节数
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")    
    }

  // 获取要分配的内存
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0: 
        // Queue or element size is zero. // 无缓冲类型
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size) // chan的大小

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}

makechan中我们得到下面信息:

  1. chan支持最大容量为(1<<16) - 1

  2. 对于无缓冲类型chan,不会去申请buf数组

  3. 我们传入的size(chan的容量),最终决定底层环形数组的长度

接收

接收值函数重载

从一个channel中获取值有两种形式

var x = <-ch    // 当接收到相应类型的零值时无法知道是真实的发送者发送过来的值,还是 channel 被关闭后,返回给接收者的默认类型的零值
var xx, ok = <- ch // 如果ok == false,表示channel已经被关闭。
// 可能看到这里会有疑惑,为什么ok == false就说明channel是被关闭的?对于接收值函数的实现,传入的block参数值为true,就是说不会命中需要满足!block的分支,此外从其他分支看,对于已关闭的channel该值为false,对于未关闭的channel该值为true。因此在recv场景下,我们可以任务ok==false时,表示channel已经被关闭了。

func goroutineE(a chan<- int) {
    a <- 1
    fmt.Println("E send data: 1")
    close(a)
    return
}

func goroutineF(a <-chan int) {
    var ok bool
    val := <-a
    fmt.Printf("F received data from E %d, ok: %v \n", val, ok)
    val, ok = <-a
    fmt.Printf("F received data from E %d, ok: %v \n", val, ok)
    wg.Done()
    return
}

func TestChannelClose(t *testing.T) {
    ch := make(chan int)
    wg.Add(1)
    go goroutineE(ch)
    go goroutineF(ch)
    wg.Wait()
}
/*
=== RUN   TestChannelClose
E send data: 1
F received data from E 1, ok: false 
F received data from E 0, ok: false 
--- PASS: TestChannelClose (0.00s)
PASS
*/

通过反汇编处理,发现分别调用的是下面两种函数

// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

其实都是调用chanrecv

// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil,说明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
// 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // raceenabled: don't need to check ep, as it is always on the stack
    // or is new memory allocated by reflect.

    if debugChan {
        print("chanrecv: chan=", c, "\n")
    }

    if c == nil {
        if !block {    // 非阻塞
            return  // (false,false)
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回
    // 当我们观察到 channel 没准备好接收:
    // 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待
    // 2. 缓冲型,但 buf 里没有元素
    // 之后,又观察到 closed == 0,即 channel 未关闭。
    // 因为 channel 不可能被重复打开,所以前一个观测的时候 channel 也是未关闭的,
    // 因此在这种情况下可以直接宣布接收失败,返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()    
    }

    lock(&c.lock)

  // channel 已关闭,并且循环数组 buf 里没有元素
    // 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
    // 也就是说即使是关闭状态,但在缓冲型的 channel,
    // buf 里有元素的情况下还能接收到元素
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        // 从一个已关闭的 channel 执行接收操作,且未忽略返回值
    // 那么接收的值将是一个该类型的零值
    // typedmemclr 根据类型清理相应地址的内存
    if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
    // 从一个已关闭的 channel 接收,selected 会返回true
        return true, false
    }

  // 等待发送队列里有 goroutine 存在,说明 buf 是满的
    // 这有可能是:
    // 1. 非缓冲型的 channel
    // 2. 缓冲型的 channel,但 buf 满了
    // 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
    // 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

  // 缓冲型,buf 里有元素,可以正常接收
    if c.qcount > 0 {
        // Receive directly from queue
    // 直接从循环数组里找到要接收的元素
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
    // 代码里,没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
    // 清理掉循环数组里相应位置的值。
        typedmemclr(c.elemtype, qp)
    // 环形队列,队首游标左移
        c.recvx++
    // 队首游标溢出检查
        if c.recvx == c.dataqsiz {
            c.recvx = 0    // 置0
        }
    // 数组元素减1
        c.qcount--
        unlock(&c.lock)
    // 从一个未关闭的channel接收到值
        return true, true        
    }

    if !block {
        unlock(&c.lock)
    // 非阻塞接收,selected 返回 false,因为没有接收到值
        return false, false
    }

    // no sender available: block on this channel.
  // 阻塞接收
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
  // 写待接收是数据的地址
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    // 阻塞。进入等待接收队列
  c.recvq.enqueue(mysg)
  // 将当前goroutine挂起
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

    // someone woke us up
  // 被唤醒了
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
  // 成功接收到数据(true),以及channel的开关状态(是否被关闭了)
    return true, !closed
}

recvchan中我们得知:

  1. 在非阻塞模式下并且channel没有被关闭。如果是非缓冲型channel,等待发送列队 sendq 里没有 goroutine 在等待,又或者是缓冲型channel,但 buf 里没有元素。select将无法从中获得数据,selected == false,这里的received也为false会是代表channel已经被关闭了?其实不是,那只是在recv也就是<- ch场景下我们可以这样认为,这也是为什么在上面解释的原因。

    // compiler implements
    //
    //    select {
    //    case v = <-c:
    //        ... foo
    //    default:
    //        ... bar
    //    }
    //
    // as
    //
    //    if selectnbrecv(&v, c) {
    //        ... foo
    //    } else {
    //        ... bar
    //    }
    //
    func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
        selected, _ = chanrecv(c, elem, false)
        return
    }
    
    // compiler implements
    //
    //    select {
    //    case v, ok = <-c:
    //        ... foo
    //    default:
    //        ... bar
    //    }
    //
    // as
    //
    //    if c != nil && selectnbrecv2(&v, &ok, c) {
    //        ... foo
    //    } else {
    //        ... bar
    //    }
    //
    func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
        // TODO(khr): just return 2 values from this function, now that it is in Go.
        selected, *received = chanrecv(c, elem, false)
        return
    }
    
    // block参数值为false,那么对一个nil chan进行select并不会像接收操作那样panic
    // 如果是一个 nil 的 channel
    if c == nil {
      // 如果不阻塞,直接返回 (false, false)
      if !block {
        return
      }
      // 否则,接收一个 nil 的 channel,goroutine 挂起
      gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
      // 不会执行到这里
      throw("unreachable")
    }
    
    func TestChannelSelectFromNil(t *testing.T) {
        var ch chan int
        var x int
        var ok bool
    
        var cnt = 12
        var hit int
        for cnt > 0 {
            select {
            case x = <-ch:
                fmt.Println(x)
            case x, ok = <-ch:
                fmt.Println(x, ok)
            default:
                //fmt.Println("default")
                hit++
            }
            cnt--
        }
        fmt.Println(hit)    // 12
    }
  2. channel已经关闭,并且里面已经没有元素,我们仍然可以从中获得元素(得到的是channel容器元素中类型的零值)

  3. 如果等待发送队列中有goroutine存在。如果是非缓冲型channel,那么直接将其拷贝到接收者的栈;如果是缓冲型channel,那么将弹出队首元素,将其添加到队尾。

  4. 如果缓冲型channel中有元素,那么对于"val <- ch"这种方式的接收则会通过typedmemmove进行拷贝,然后回收环形队列的队首。

  5. 如果channel处于阻塞状态并且等待发送队列中不存在goroutine,那么当前go routine将会被挂起,直到有数据被send进channel而被唤醒。

发送

和分析<- ch一样,我们通过查看汇编代码得知ch <-最后调用的是chansend函数。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
    // 不能阻塞,直接返回 false,表示未发送成功
        if !block {
            return false
        }
    // 阻塞,当前 goroutine 被挂起
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }

    // 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
    // 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
    // 2. channel 是缓冲型的,但循环数组已经装满了元素
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    lock(&c.lock)

  // 如果channel关闭了,向里面发送数据则会panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

  // 如果等待接收队列里有goroutine,则直接将其拷贝给接收gouroutine
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

  // 对于缓冲型channel,如果还有缓冲空间
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep) //  将数据从ep处拷贝到qp处
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true // 发送成功
    }

  // 如果是非阻塞模式下,则直接提示发送失败
    if !block {
        unlock(&c.lock)
        return false
    }

    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
  // 当前gorutine进入等待发送队列
    c.sendq.enqueue(mysg)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
    KeepAlive(ep)

    // someone woke us up.
  // 被唤醒,有机会发送了
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
    // 被唤醒后,发现channel被关闭了,那么将panic
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil    // 解除channel绑定信息
    releaseSudog(mysg)
    return true
}

发送数据会有以下情况:

  1. 如果channel为nil,在非阻塞模式下会提示发送失败(return false),在阻塞模式下当前goroutine则会被挂起

  2. 在非阻塞模式下,如果发送数据给非缓冲型channel或者是元素已满的缓冲型channel,则会提示发送失败。

  3. 如果当前channel已经被关闭,当尝试向其发送数据时会panic

  4. 如果等待接收队列里有goroutine,直接将要发送的数据拷贝到接收goroutine。(当等待接收队列里有goroutine的时候说明,该channel是非缓冲型channel或者是元素为空的缓冲型channel)

  5. 如果是缓冲型channel并且还有缓冲空间,那么将数据拷贝到buf数组中(环形队列),提示发送成功

  6. 如果是缓冲型channel并且已经满了,那么将被挂起,直到被唤醒。

关闭

关闭channel,会执行函数closechan

func closechan(c *hchan) {
  // 关闭一个nil channel,panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
  // 关闭一个非打开状态的channel,panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }

  // 修改channel状态为 已关闭
    c.closed = 1

    var glist gList

    // release all readers
  // 将 channel 所有等待接收队列的里 sudog 释放
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // release all writers (they will panic)
  // 将 channel 等待发送队列里的 sudog 释放
    // 如果存在,这些 goroutine 将会 panic
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
  // 将goroutine唤醒
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

从代码得知:

  1. 如果我们关闭一个nil channel会panic

  2. 如果我们关闭一个非打开状态的channel,会panic

  3. 成功关闭一个channel的话。如果等待接收队列里还有goroutine那么将会被释放;如果等待发送队列里还有goroutine将会被释放,只是这些goroutine都是因为panic而异常退出的。

channel异常总结

channel异常总结

关闭已经关闭的channel也会引发panic

channel的应用

停止信号

// 摘自go时间轮实现,通过stop channel传递stop信号
// Stop 停止时间轮
func (tw *wheel) Stop() {
    tw.stop <- struct{}{}
}

// Scheduler 调度
func (tw *WheelWithHeap) scheduler() {
    for {
        select {
        case <-tw.ticker.C:
            tw.handleTick() // 处理延时任务
        case aTask := <-tw.taskChannel.add:
            tw.registerTask(aTask) //  注册延时任务
        case rTask := <-tw.taskChannel.remove:
            tw.collectTask(rTask) // 回收延时任务
        case <-tw.stop:
            tw.exited = true
            tw.ticker.Stop()
            return
        }
    }
}

任务定时

延时任务

select {
    case <-time.After(100 * time.Millisecond):
    case <-s.stopc:
        return false
}

定期任务

func worker() {
    ticker := time.Tick(1 * time.Second)
    for {
        select {
        case <- ticker:
            // 执行定时任务
            fmt.Println("执行 1s 定时任务")
        }
    }
}

解耦生产方和消费方

func TestChannelUseWithWorker(t *testing.T) {
    var ch = make(chan int, 100)
    wg.Add(10)
    for i := 0; i < 10; i++ {
        ch <- i
    }
    go worker(ch)
    wg.Wait()
}

func worker(ch <-chan int) {
    // 启动 5 个工作协程
    for i := 0; i < 5; i++ {
        go func(id int) {
            for {
                select {
                case task := <-ch:
                    fmt.Printf("finish task: %d by worker %d\n", task, id)
                    time.Sleep(time.Second)
                    wg.Done()
                default:
                }
            }
        }(i)
    }
}

控制并发

// 参考https://github.com/1005281342/test_tools/blob/master/survey/func.go
func Func2(f func(interface{}), cnt int) func(int) {
    return func(n int) {
        ch := make(chan int, cnt)
        for i, v := range RandShuffle(n) {
            wg.Add(1)
            go func(a int) {
        ch <- i
                defer func() {
          // 避免f(a)异常退出导致阻塞
                    <-ch
          wg.Done()
                }()
                f(a)
            }(v)
        }

        for i := 0; i < cnt; i++ {
            ch <- i
        }
        wg.Wait()
    }
}

内存泄露

“通道可能会引发goroutine leak,确切地说,是指goroutine处于发送或接收阻塞状态,但一直未被唤醒。垃圾回收器并不收集此类资源,导致它们会在等待队列里长久休眠,形成资源泄漏。”

摘录来自: 雨痕. Go语言学习笔记。

func test() { 
   c:=make(chan int) 

   for i:=0;i<10;i++ { 
       go func() { 
            <-c
        }() 
    } 
} 

func main() { 
   test() 

   for{ 
       time.Sleep(time.Second) 
       runtime.GC()          // 强制垃圾回收 
    } 
}
// $go build-o test
// $GODEBUG="gctrace=1,schedtrace=1000,scheddetail=1" ./test
// 从监控结果可以看到大量goroutine一直处于chan receive状态,无法结束

参考

深度解密Go语言之channel

Golang 源码导读 —— channel

我可能并不会使用golang chan

Go语言基础之并发

雨痕. Go语言学习笔记

最后更新于

这有帮助吗?