# channel：从理解源码到应用

“不要通过共享内存来通信，而要通过通信来实现内存共享”

channel 的底层就是通过 mutex 来控制并发的。只是 channel 是更高一层次的并发编程原语，封装了更多的功能。

使用原子函数、读写锁可以保证资源的共享访问安全，但使用 channel 更优雅

`lock` 用来保证每个读 channel 或写 channel 的操作都是原子的

## 为什么要channel

Go 通过 channel 实现 CSP 通信模型，主要用于 goroutine 之间的消息传递和事件通知。

有了 channel 和 goroutine 之后，Go 的并发编程变得异常容易和安全，得以让程序员把注意力留到业务上去，实现开发效率的提升。

## 数据结构

```go
type hchan struct {
    qcount   uint           // total data in the queue 当前队列（chan）中的元素个数
    dataqsiz uint           // size of the circular queue    环形队列容量
    buf      unsafe.Pointer // points to an array of dataqsiz elements 指针，指向底层环形队列存储（数组）
    elemsize uint16    // chan中元素大小 // c.elemsize = uint16(elem.size)
    closed   uint32    // 是否关闭
    elemtype *_type // element type channel容器内的元素类型
    sendx    uint   // send index    // 已发送元素在循环数组中的索引
    recvx    uint   // receive index // 已接收元素在循环数组中的索引
    recvq    waitq  // list of recv waiters // 等待接收的 goroutine 队列（双向链表）
    sendq    waitq  // list of send waiters // 等待发送的 goroutine 队列（双向链表）

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex    // 锁
}
```

![img](https://122767808-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MBDSdFU2kWrJocOCeSH%2Fsync%2Fd8b88c7b5d7983bf2c7df3418817265790f77a51.png?generation=1600008661404152\&alt=media)

解释：

`buf`指向底层数组，缓冲型`channel`会分配数组，非缓冲型数组则不分配。

`sendx`，`recvx`指向底层循环数组下标，表示当前可以发送和接受元素索引值，也就是环形队列中的队首和队尾。

`waitq`是元素为`sudog`的一个双向链表，而`sudog`是对`goroutine`的一个封装

```go
type waitq struct {
    first *sudog
    last  *sudog
}
```

`lock`用来保证每个读`channel`或写`channel`的操作都是原子的。

例如，现在有一个容量为 6 的，元素为 int 型的 channel ，其数据结构如下 ：

![img](https://122767808-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MBDSdFU2kWrJocOCeSH%2Fsync%2F26217690bea596d0facef62f125ea458c8835bdf.png?generation=1600008659451558\&alt=media)

解释：

`qcount(4)`表示队列中有4个元素

`dataqsiz(6)`表示队列的容量为6

`buf`指向底层数组（环形队列）

`elemsize(8)`表示channel容器中的元素类型大小。比如对于`chan int`，其值为8

```go
// var x int = 100
// fmt.Println(unsafe.Sizeof(x)) // 8 单位bit
```

## 创建channel

### channel按是否带缓冲分类

Channel 分为两种：带缓冲、不带缓冲。对不带缓冲的 channel 进行的操作实际上可以看作“同步模式”，带缓冲的则称为“异步模式”。

**不带缓冲的channel（同步模式）**

```go
func TestChannelBlocking(t *testing.T) {
    var ch = make(chan struct{})

    // 这个时候没有消费者就绪，此处会阻塞
    //ch <- struct{}{}    // fatal error: all goroutines are asleep - deadlock!

    wg.Add(1)
    // go 一个消费者
    go func(ch chan struct{}) {
        <-ch
        wg.Done()
    }(ch)

    // 生产者生成消息
    ch <- struct{}{}
    wg.Wait()
}
```

同步模式下，发送方和接收方要同步就绪，只有在两者都 ready 的情况下，数据才能在两者间传输（后面会看到，实际上就是内存拷贝）。否则，任意一方先行进行发送或接收操作，都会被挂起，等待另一方的出现才能被唤醒。

**带缓冲的channel（异步模式）**

```go
func TestChannelNonBlocking(t *testing.T) {
    var ch = make(chan struct{}, 2) // 可以缓冲2个元素
    // 生产者生成消息
    ch <- struct{}{}

    wg.Add(1)
    // go 一个消费者
    go func(ch chan struct{}) {
        <-ch
        wg.Done()
    }(ch)

    // 生产者生成消息
    ch <- struct{}{}
    wg.Wait()
}
```

异步模式下，在缓冲槽可用的情况下（有剩余容量），发送和接收操作都可以顺利进行。否则，操作的一方（如写入）同样会被挂起，直到出现相反操作（如接收）才会被唤醒。

小结一下：同步模式下，必须要使发送方和接收方配对，操作才会成功，否则会被阻塞；异步模式下，缓冲槽要有剩余容量，操作才会成功，否则也会被阻塞。

### channel按读写分类

**只读channel**

```go
func goroutineA(a <-chan int) {
    val := <-a
    fmt.Println("G1 received data: ", val)
    wg.Done()
    return
}

func goroutineB(b <-chan int) {
    // b <- 10 这是一个错误的操作，因为b是一个只读channel，也就是说对于b来说没有写权限
    val := <-b
    fmt.Println("G2 received data: ", val)
    wg.Done()
    return
}

func TestChannelRecv(t *testing.T) {
    ch := make(chan int)
    go goroutineA(ch)
    go goroutineB(ch)
    wg.Add(1)
    ch <- 3
    wg.Wait()
}
```

对于`<-chan`类型来说，表示这个chan只有读权限

**只写channel**

```go
func goroutineC(a chan<- int) {
    a <- 1
    fmt.Println("C send data: 1")
    return
}

func goroutineD(a <-chan int) {
    val := <-a
    fmt.Println("D received data from C: ", val)
    wg.Done()
    return
}

func TestChannelSend(t *testing.T) {
    ch := make(chan int)
    wg.Add(1)
    go goroutineD(ch)
    go goroutineC(ch)
    wg.Wait()
}
```

小结一下：我们可以通过设置`channel`的读写权限来明确每个函数的身份（是消费者还是生产者，或者既是生产者又是消费者），同时可以有效避免只读成员发生误写操作、只写成员消费数据。

### 创建一个channel

我们可以通过`make`来创建一个channel，如`var ch = make(chan int)`。那么使用`make`创建一个channel都发生了什么呢？

```go
func main() {
    makeChan()
}

func makeChan()  {
    _ = make(chan int)
}
//  CALL    runtime.makechan(SB)
```

通过查看汇编代码发现，是通过调用`makechan`来创建channel的，我们来看看`makechan`长什么样子吧。

```go
func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
  if elem.size >= 1<<16 {    // 从这里我们得到信息：channel最大容量为（(1<<16) - 1）
        throw("makechan: invalid channel element type")
    }

  // 检查对齐字节数
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")    
    }

  // 获取要分配的内存
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0: 
        // Queue or element size is zero. // 无缓冲类型
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size) // chan的大小

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}
```

从`makechan`中我们得到下面信息：

1. chan支持最大容量为`(1<<16) - 1`
2. 对于无缓冲类型chan，不会去申请buf数组
3. 我们传入的size（chan的容量），最终决定底层环形数组的长度

## 接收

### 接收值函数重载

从一个channel中获取值有两种形式

```go
var x = <-ch    // 当接收到相应类型的零值时无法知道是真实的发送者发送过来的值，还是 channel 被关闭后，返回给接收者的默认类型的零值
var xx, ok = <- ch // 如果ok == false，表示channel已经被关闭。
// 可能看到这里会有疑惑，为什么ok == false就说明channel是被关闭的？对于接收值函数的实现，传入的block参数值为true，就是说不会命中需要满足!block的分支，此外从其他分支看，对于已关闭的channel该值为false，对于未关闭的channel该值为true。因此在recv场景下，我们可以任务ok==false时，表示channel已经被关闭了。

func goroutineE(a chan<- int) {
    a <- 1
    fmt.Println("E send data: 1")
    close(a)
    return
}

func goroutineF(a <-chan int) {
    var ok bool
    val := <-a
    fmt.Printf("F received data from E %d, ok: %v \n", val, ok)
    val, ok = <-a
    fmt.Printf("F received data from E %d, ok: %v \n", val, ok)
    wg.Done()
    return
}

func TestChannelClose(t *testing.T) {
    ch := make(chan int)
    wg.Add(1)
    go goroutineE(ch)
    go goroutineF(ch)
    wg.Wait()
}
/*
=== RUN   TestChannelClose
E send data: 1
F received data from E 1, ok: false 
F received data from E 0, ok: false 
--- PASS: TestChannelClose (0.00s)
PASS
*/
```

通过反汇编处理，发现分别调用的是下面两种函数

```go
// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}
```

其实都是调用`chanrecv`

```go
// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil，说明忽略了接收值。
// 如果 block == false，即非阻塞型接收，在没有数据可接收的情况下，返回 (false, false)
// 否则，如果 c 处于关闭状态，将 ep 指向的地址清零，返回 (true, false)
// 否则，用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空，则应该指向堆或者函数调用者的栈
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // raceenabled: don't need to check ep, as it is always on the stack
    // or is new memory allocated by reflect.

    if debugChan {
        print("chanrecv: chan=", c, "\n")
    }

    if c == nil {
        if !block {    // 非阻塞
            return  // （false，false）
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // 在非阻塞模式下，快速检测到失败，不用获取锁，快速返回
    // 当我们观察到 channel 没准备好接收：
    // 1. 非缓冲型，等待发送列队 sendq 里没有 goroutine 在等待
    // 2. 缓冲型，但 buf 里没有元素
    // 之后，又观察到 closed == 0，即 channel 未关闭。
    // 因为 channel 不可能被重复打开，所以前一个观测的时候 channel 也是未关闭的，
    // 因此在这种情况下可以直接宣布接收失败，返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()    
    }

    lock(&c.lock)

  // channel 已关闭，并且循环数组 buf 里没有元素
    // 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
    // 也就是说即使是关闭状态，但在缓冲型的 channel，
    // buf 里有元素的情况下还能接收到元素
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        // 从一个已关闭的 channel 执行接收操作，且未忽略返回值
    // 那么接收的值将是一个该类型的零值
    // typedmemclr 根据类型清理相应地址的内存
    if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
    // 从一个已关闭的 channel 接收，selected 会返回true
        return true, false
    }

  // 等待发送队列里有 goroutine 存在，说明 buf 是满的
    // 这有可能是：
    // 1. 非缓冲型的 channel
    // 2. 缓冲型的 channel，但 buf 满了
    // 针对 1，直接进行内存拷贝（从 sender goroutine -> receiver goroutine）
    // 针对 2，接收到循环数组头部的元素，并将发送者的元素放到循环数组尾部
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

  // 缓冲型，buf 里有元素，可以正常接收
    if c.qcount > 0 {
        // Receive directly from queue
    // 直接从循环数组里找到要接收的元素
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
    // 代码里，没有忽略要接收的值，不是 "<- ch"，而是 "val <- ch"，ep 指向 val
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
    // 清理掉循环数组里相应位置的值。
        typedmemclr(c.elemtype, qp)
    // 环形队列，队首游标左移
        c.recvx++
    // 队首游标溢出检查
        if c.recvx == c.dataqsiz {
            c.recvx = 0    // 置0
        }
    // 数组元素减1
        c.qcount--
        unlock(&c.lock)
    // 从一个未关闭的channel接收到值
        return true, true        
    }

    if !block {
        unlock(&c.lock)
    // 非阻塞接收，selected 返回 false，因为没有接收到值
        return false, false
    }

    // no sender available: block on this channel.
  // 阻塞接收
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
  // 写待接收是数据的地址
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    // 阻塞。进入等待接收队列
  c.recvq.enqueue(mysg)
  // 将当前goroutine挂起
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

    // someone woke us up
  // 被唤醒了
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
  // 成功接收到数据（true），以及channel的开关状态（是否被关闭了）
    return true, !closed
}
```

从`recvchan`中我们得知：

1. 在非阻塞模式下并且channel没有被关闭。如果是非缓冲型channel，等待发送列队 sendq 里没有 goroutine 在等待，又或者是缓冲型channel，但 buf 里没有元素。select将无法从中获得数据，selected == false，这里的`received`也为`false`会是代表channel已经被关闭了？其实不是，那只是在`recv`也就是`<- ch`场景下我们可以这样认为，这也是为什么在上面解释的原因。

   ```go
   // compiler implements
   //
   //    select {
   //    case v = <-c:
   //        ... foo
   //    default:
   //        ... bar
   //    }
   //
   // as
   //
   //    if selectnbrecv(&v, c) {
   //        ... foo
   //    } else {
   //        ... bar
   //    }
   //
   func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
       selected, _ = chanrecv(c, elem, false)
       return
   }

   // compiler implements
   //
   //    select {
   //    case v, ok = <-c:
   //        ... foo
   //    default:
   //        ... bar
   //    }
   //
   // as
   //
   //    if c != nil && selectnbrecv2(&v, &ok, c) {
   //        ... foo
   //    } else {
   //        ... bar
   //    }
   //
   func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
       // TODO(khr): just return 2 values from this function, now that it is in Go.
       selected, *received = chanrecv(c, elem, false)
       return
   }

   // block参数值为false，那么对一个nil chan进行select并不会像接收操作那样panic
   // 如果是一个 nil 的 channel
   if c == nil {
     // 如果不阻塞，直接返回 (false, false)
     if !block {
       return
     }
     // 否则，接收一个 nil 的 channel，goroutine 挂起
     gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
     // 不会执行到这里
     throw("unreachable")
   }

   func TestChannelSelectFromNil(t *testing.T) {
       var ch chan int
       var x int
       var ok bool

       var cnt = 12
       var hit int
       for cnt > 0 {
           select {
           case x = <-ch:
               fmt.Println(x)
           case x, ok = <-ch:
               fmt.Println(x, ok)
           default:
               //fmt.Println("default")
               hit++
           }
           cnt--
       }
       fmt.Println(hit)    // 12
   }
   ```
2. channel已经关闭，并且里面已经没有元素，我们仍然可以从中获得元素（得到的是channel容器元素中类型的零值）
3. 如果等待发送队列中有goroutine存在。如果是非缓冲型channel，那么直接将其拷贝到接收者的栈；如果是缓冲型channel，那么将弹出队首元素，将其添加到队尾。
4. 如果缓冲型channel中有元素，那么对于"val <- ch"这种方式的接收则会通过`typedmemmove`进行拷贝，然后回收环形队列的队首。
5. 如果channel处于阻塞状态并且等待发送队列中不存在goroutine，那么当前go routine将会被挂起，直到有数据被send进channel而被唤醒。

## 发送

和分析`<- ch`一样，我们通过查看汇编代码得知`ch <-`最后调用的是`chansend`函数。

```go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
    // 不能阻塞，直接返回 false，表示未发送成功
        if !block {
            return false
        }
    // 阻塞，当前 goroutine 被挂起
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }

    // 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是：
    // 1. channel 是非缓冲型的，且等待接收队列里没有 goroutine
    // 2. channel 是缓冲型的，但循环数组已经装满了元素
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    lock(&c.lock)

  // 如果channel关闭了，向里面发送数据则会panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

  // 如果等待接收队列里有goroutine，则直接将其拷贝给接收gouroutine
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

  // 对于缓冲型channel，如果还有缓冲空间
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep) //  将数据从ep处拷贝到qp处
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true // 发送成功
    }

  // 如果是非阻塞模式下，则直接提示发送失败
    if !block {
        unlock(&c.lock)
        return false
    }

    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
  // 当前gorutine进入等待发送队列
    c.sendq.enqueue(mysg)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
    KeepAlive(ep)

    // someone woke us up.
  // 被唤醒，有机会发送了
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
    // 被唤醒后，发现channel被关闭了，那么将panic
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil    // 解除channel绑定信息
    releaseSudog(mysg)
    return true
}
```

发送数据会有以下情况：

1. 如果channel为nil，在非阻塞模式下会提示发送失败（return false），在阻塞模式下当前goroutine则会被挂起
2. 在非阻塞模式下，如果发送数据给非缓冲型channel或者是元素已满的缓冲型channel，则会提示发送失败。
3. 如果当前channel已经被关闭，当尝试向其发送数据时会panic
4. 如果等待接收队列里有goroutine，直接将要发送的数据拷贝到接收goroutine。（当等待接收队列里有goroutine的时候说明，该channel是非缓冲型channel或者是元素为空的缓冲型channel）
5. 如果是缓冲型channel并且还有缓冲空间，那么将数据拷贝到buf数组中（环形队列），提示发送成功
6. 如果是缓冲型channel并且已经满了，那么将被挂起，直到被唤醒。

## 关闭

关闭channel，会执行函数`closechan`

```go
func closechan(c *hchan) {
  // 关闭一个nil channel，panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
  // 关闭一个非打开状态的channel，panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }

  // 修改channel状态为 已关闭
    c.closed = 1

    var glist gList

    // release all readers
  // 将 channel 所有等待接收队列的里 sudog 释放
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // release all writers (they will panic)
  // 将 channel 等待发送队列里的 sudog 释放
    // 如果存在，这些 goroutine 将会 panic
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
  // 将goroutine唤醒
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}
```

从代码得知：

1. 如果我们关闭一个`nil channel`会panic
2. 如果我们关闭一个非打开状态的channel，会panic
3. 成功关闭一个channel的话。如果等待接收队列里还有goroutine那么将会被释放；如果等待发送队列里还有goroutine将会被释放，只是这些goroutine都是因为panic而异常退出的。

## channel异常总结

![channel异常总结](https://122767808-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MBDSdFU2kWrJocOCeSH%2Fsync%2F65e0965582d66cd56f0fc5cf9fba11e9381a7bbf.png?generation=1600008660409847\&alt=media)

**关闭已经关闭的`channel`也会引发`panic`**

## channel的应用

### 停止信号

```go
// 摘自go时间轮实现，通过stop channel传递stop信号
// Stop 停止时间轮
func (tw *wheel) Stop() {
    tw.stop <- struct{}{}
}

// Scheduler 调度
func (tw *WheelWithHeap) scheduler() {
    for {
        select {
        case <-tw.ticker.C:
            tw.handleTick() // 处理延时任务
        case aTask := <-tw.taskChannel.add:
            tw.registerTask(aTask) //  注册延时任务
        case rTask := <-tw.taskChannel.remove:
            tw.collectTask(rTask) // 回收延时任务
        case <-tw.stop:
            tw.exited = true
            tw.ticker.Stop()
            return
        }
    }
}
```

### 任务定时

延时任务

```go
select {
    case <-time.After(100 * time.Millisecond):
    case <-s.stopc:
        return false
}
```

定期任务

```go
func worker() {
    ticker := time.Tick(1 * time.Second)
    for {
        select {
        case <- ticker:
            // 执行定时任务
            fmt.Println("执行 1s 定时任务")
        }
    }
}
```

### 解耦生产方和消费方

```go
func TestChannelUseWithWorker(t *testing.T) {
    var ch = make(chan int, 100)
    wg.Add(10)
    for i := 0; i < 10; i++ {
        ch <- i
    }
    go worker(ch)
    wg.Wait()
}

func worker(ch <-chan int) {
    // 启动 5 个工作协程
    for i := 0; i < 5; i++ {
        go func(id int) {
            for {
                select {
                case task := <-ch:
                    fmt.Printf("finish task: %d by worker %d\n", task, id)
                    time.Sleep(time.Second)
                    wg.Done()
                default:
                }
            }
        }(i)
    }
}
```

### 控制并发

```go
// 参考https://github.com/1005281342/test_tools/blob/master/survey/func.go
func Func2(f func(interface{}), cnt int) func(int) {
    return func(n int) {
        ch := make(chan int, cnt)
        for i, v := range RandShuffle(n) {
            wg.Add(1)
            go func(a int) {
        ch <- i
                defer func() {
          // 避免f(a)异常退出导致阻塞
                    <-ch
          wg.Done()
                }()
                f(a)
            }(v)
        }

        for i := 0; i < cnt; i++ {
            ch <- i
        }
        wg.Wait()
    }
}
```

## 内存泄露

“通道可能会引发goroutine leak，确切地说，是指goroutine处于发送或接收阻塞状态，但一直未被唤醒。垃圾回收器并不收集此类资源，导致它们会在等待队列里长久休眠，形成资源泄漏。”

摘录来自: 雨痕. Go语言学习笔记。

```go
func test() { 
   c:=make(chan int) 

   for i:=0;i<10;i++ { 
       go func() { 
            <-c
        }() 
    } 
} 

func main() { 
   test() 

   for{ 
       time.Sleep(time.Second) 
       runtime.GC()          // 强制垃圾回收 
    } 
}
// $go build-o test
// $GODEBUG="gctrace=1,schedtrace=1000,scheddetail=1" ./test
// 从监控结果可以看到大量goroutine一直处于chan receive状态，无法结束
```

## 参考

[深度解密Go语言之channel](https://qcrao.com/2019/07/22/dive-into-go-channel/)

[Golang 源码导读 —— channel](https://studygolang.com/articles/21586)

[我可能并不会使用golang chan](https://juejin.im/post/6844904164951080973)

[Go语言基础之并发](https://www.liwenzhou.com/posts/Go/14_concurrence/)

雨痕. Go语言学习笔记
