> For the complete documentation index, see [llms.txt](https://1005281342.gitbook.io/code-porter/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://1005281342.gitbook.io/code-porter/yu-fa-ji-chu/channel/0_channel.md).

# channel：从理解源码到应用

“不要通过共享内存来通信，而要通过通信来实现内存共享”

channel 的底层就是通过 mutex 来控制并发的。只是 channel 是更高一层次的并发编程原语，封装了更多的功能。

使用原子函数、读写锁可以保证资源的共享访问安全，但使用 channel 更优雅

`lock` 用来保证每个读 channel 或写 channel 的操作都是原子的

## 为什么要channel

Go 通过 channel 实现 CSP 通信模型，主要用于 goroutine 之间的消息传递和事件通知。

有了 channel 和 goroutine 之后，Go 的并发编程变得异常容易和安全，得以让程序员把注意力留到业务上去，实现开发效率的提升。

## 数据结构

```go
type hchan struct {
    qcount   uint           // total data in the queue 当前队列（chan）中的元素个数
    dataqsiz uint           // size of the circular queue    环形队列容量
    buf      unsafe.Pointer // points to an array of dataqsiz elements 指针，指向底层环形队列存储（数组）
    elemsize uint16    // chan中元素大小 // c.elemsize = uint16(elem.size)
    closed   uint32    // 是否关闭
    elemtype *_type // element type channel容器内的元素类型
    sendx    uint   // send index    // 已发送元素在循环数组中的索引
    recvx    uint   // receive index // 已接收元素在循环数组中的索引
    recvq    waitq  // list of recv waiters // 等待接收的 goroutine 队列（双向链表）
    sendq    waitq  // list of send waiters // 等待发送的 goroutine 队列（双向链表）

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex    // 锁
}
```

![img](/files/-MH6wbtTIi9bAAbCMG74)

解释：

`buf`指向底层数组，缓冲型`channel`会分配数组，非缓冲型数组则不分配。

`sendx`，`recvx`指向底层循环数组下标，表示当前可以发送和接受元素索引值，也就是环形队列中的队首和队尾。

`waitq`是元素为`sudog`的一个双向链表，而`sudog`是对`goroutine`的一个封装

```go
type waitq struct {
    first *sudog
    last  *sudog
}
```

`lock`用来保证每个读`channel`或写`channel`的操作都是原子的。

例如，现在有一个容量为 6 的，元素为 int 型的 channel ，其数据结构如下 ：

![img](/files/-MH6wbtVFiuhJk4od_g1)

解释：

`qcount(4)`表示队列中有4个元素

`dataqsiz(6)`表示队列的容量为6

`buf`指向底层数组（环形队列）

`elemsize(8)`表示channel容器中的元素类型大小。比如对于`chan int`，其值为8

```go
// var x int = 100
// fmt.Println(unsafe.Sizeof(x)) // 8 单位bit
```

## 创建channel

### channel按是否带缓冲分类

Channel 分为两种：带缓冲、不带缓冲。对不带缓冲的 channel 进行的操作实际上可以看作“同步模式”，带缓冲的则称为“异步模式”。

**不带缓冲的channel（同步模式）**

```go
func TestChannelBlocking(t *testing.T) {
    var ch = make(chan struct{})

    // 这个时候没有消费者就绪，此处会阻塞
    //ch <- struct{}{}    // fatal error: all goroutines are asleep - deadlock!

    wg.Add(1)
    // go 一个消费者
    go func(ch chan struct{}) {
        <-ch
        wg.Done()
    }(ch)

    // 生产者生成消息
    ch <- struct{}{}
    wg.Wait()
}
```

同步模式下，发送方和接收方要同步就绪，只有在两者都 ready 的情况下，数据才能在两者间传输（后面会看到，实际上就是内存拷贝）。否则，任意一方先行进行发送或接收操作，都会被挂起，等待另一方的出现才能被唤醒。

**带缓冲的channel（异步模式）**

```go
func TestChannelNonBlocking(t *testing.T) {
    var ch = make(chan struct{}, 2) // 可以缓冲2个元素
    // 生产者生成消息
    ch <- struct{}{}

    wg.Add(1)
    // go 一个消费者
    go func(ch chan struct{}) {
        <-ch
        wg.Done()
    }(ch)

    // 生产者生成消息
    ch <- struct{}{}
    wg.Wait()
}
```

异步模式下，在缓冲槽可用的情况下（有剩余容量），发送和接收操作都可以顺利进行。否则，操作的一方（如写入）同样会被挂起，直到出现相反操作（如接收）才会被唤醒。

小结一下：同步模式下，必须要使发送方和接收方配对，操作才会成功，否则会被阻塞；异步模式下，缓冲槽要有剩余容量，操作才会成功，否则也会被阻塞。

### channel按读写分类

**只读channel**

```go
func goroutineA(a <-chan int) {
    val := <-a
    fmt.Println("G1 received data: ", val)
    wg.Done()
    return
}

func goroutineB(b <-chan int) {
    // b <- 10 这是一个错误的操作，因为b是一个只读channel，也就是说对于b来说没有写权限
    val := <-b
    fmt.Println("G2 received data: ", val)
    wg.Done()
    return
}

func TestChannelRecv(t *testing.T) {
    ch := make(chan int)
    go goroutineA(ch)
    go goroutineB(ch)
    wg.Add(1)
    ch <- 3
    wg.Wait()
}
```

对于`<-chan`类型来说，表示这个chan只有读权限

**只写channel**

```go
func goroutineC(a chan<- int) {
    a <- 1
    fmt.Println("C send data: 1")
    return
}

func goroutineD(a <-chan int) {
    val := <-a
    fmt.Println("D received data from C: ", val)
    wg.Done()
    return
}

func TestChannelSend(t *testing.T) {
    ch := make(chan int)
    wg.Add(1)
    go goroutineD(ch)
    go goroutineC(ch)
    wg.Wait()
}
```

小结一下：我们可以通过设置`channel`的读写权限来明确每个函数的身份（是消费者还是生产者，或者既是生产者又是消费者），同时可以有效避免只读成员发生误写操作、只写成员消费数据。

### 创建一个channel

我们可以通过`make`来创建一个channel，如`var ch = make(chan int)`。那么使用`make`创建一个channel都发生了什么呢？

```go
func main() {
    makeChan()
}

func makeChan()  {
    _ = make(chan int)
}
//  CALL    runtime.makechan(SB)
```

通过查看汇编代码发现，是通过调用`makechan`来创建channel的，我们来看看`makechan`长什么样子吧。

```go
func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
  if elem.size >= 1<<16 {    // 从这里我们得到信息：channel最大容量为（(1<<16) - 1）
        throw("makechan: invalid channel element type")
    }

  // 检查对齐字节数
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")    
    }

  // 获取要分配的内存
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0: 
        // Queue or element size is zero. // 无缓冲类型
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size) // chan的大小

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}
```

从`makechan`中我们得到下面信息：

1. chan支持最大容量为`(1<<16) - 1`
2. 对于无缓冲类型chan，不会去申请buf数组
3. 我们传入的size（chan的容量），最终决定底层环形数组的长度

## 接收

### 接收值函数重载

从一个channel中获取值有两种形式

```go
var x = <-ch    // 当接收到相应类型的零值时无法知道是真实的发送者发送过来的值，还是 channel 被关闭后，返回给接收者的默认类型的零值
var xx, ok = <- ch // 如果ok == false，表示channel已经被关闭。
// 可能看到这里会有疑惑，为什么ok == false就说明channel是被关闭的？对于接收值函数的实现，传入的block参数值为true，就是说不会命中需要满足!block的分支，此外从其他分支看，对于已关闭的channel该值为false，对于未关闭的channel该值为true。因此在recv场景下，我们可以任务ok==false时，表示channel已经被关闭了。

func goroutineE(a chan<- int) {
    a <- 1
    fmt.Println("E send data: 1")
    close(a)
    return
}

func goroutineF(a <-chan int) {
    var ok bool
    val := <-a
    fmt.Printf("F received data from E %d, ok: %v \n", val, ok)
    val, ok = <-a
    fmt.Printf("F received data from E %d, ok: %v \n", val, ok)
    wg.Done()
    return
}

func TestChannelClose(t *testing.T) {
    ch := make(chan int)
    wg.Add(1)
    go goroutineE(ch)
    go goroutineF(ch)
    wg.Wait()
}
/*
=== RUN   TestChannelClose
E send data: 1
F received data from E 1, ok: false 
F received data from E 0, ok: false 
--- PASS: TestChannelClose (0.00s)
PASS
*/
```

通过反汇编处理，发现分别调用的是下面两种函数

```go
// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}
```

其实都是调用`chanrecv`

```go
// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil，说明忽略了接收值。
// 如果 block == false，即非阻塞型接收，在没有数据可接收的情况下，返回 (false, false)
// 否则，如果 c 处于关闭状态，将 ep 指向的地址清零，返回 (true, false)
// 否则，用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空，则应该指向堆或者函数调用者的栈
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // raceenabled: don't need to check ep, as it is always on the stack
    // or is new memory allocated by reflect.

    if debugChan {
        print("chanrecv: chan=", c, "\n")
    }

    if c == nil {
        if !block {    // 非阻塞
            return  // （false，false）
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // 在非阻塞模式下，快速检测到失败，不用获取锁，快速返回
    // 当我们观察到 channel 没准备好接收：
    // 1. 非缓冲型，等待发送列队 sendq 里没有 goroutine 在等待
    // 2. 缓冲型，但 buf 里没有元素
    // 之后，又观察到 closed == 0，即 channel 未关闭。
    // 因为 channel 不可能被重复打开，所以前一个观测的时候 channel 也是未关闭的，
    // 因此在这种情况下可以直接宣布接收失败，返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()    
    }

    lock(&c.lock)

  // channel 已关闭，并且循环数组 buf 里没有元素
    // 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
    // 也就是说即使是关闭状态，但在缓冲型的 channel，
    // buf 里有元素的情况下还能接收到元素
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        // 从一个已关闭的 channel 执行接收操作，且未忽略返回值
    // 那么接收的值将是一个该类型的零值
    // typedmemclr 根据类型清理相应地址的内存
    if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
    // 从一个已关闭的 channel 接收，selected 会返回true
        return true, false
    }

  // 等待发送队列里有 goroutine 存在，说明 buf 是满的
    // 这有可能是：
    // 1. 非缓冲型的 channel
    // 2. 缓冲型的 channel，但 buf 满了
    // 针对 1，直接进行内存拷贝（从 sender goroutine -> receiver goroutine）
    // 针对 2，接收到循环数组头部的元素，并将发送者的元素放到循环数组尾部
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

  // 缓冲型，buf 里有元素，可以正常接收
    if c.qcount > 0 {
        // Receive directly from queue
    // 直接从循环数组里找到要接收的元素
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
    // 代码里，没有忽略要接收的值，不是 "<- ch"，而是 "val <- ch"，ep 指向 val
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
    // 清理掉循环数组里相应位置的值。
        typedmemclr(c.elemtype, qp)
    // 环形队列，队首游标左移
        c.recvx++
    // 队首游标溢出检查
        if c.recvx == c.dataqsiz {
            c.recvx = 0    // 置0
        }
    // 数组元素减1
        c.qcount--
        unlock(&c.lock)
    // 从一个未关闭的channel接收到值
        return true, true        
    }

    if !block {
        unlock(&c.lock)
    // 非阻塞接收，selected 返回 false，因为没有接收到值
        return false, false
    }

    // no sender available: block on this channel.
  // 阻塞接收
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
  // 写待接收是数据的地址
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    // 阻塞。进入等待接收队列
  c.recvq.enqueue(mysg)
  // 将当前goroutine挂起
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

    // someone woke us up
  // 被唤醒了
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
  // 成功接收到数据（true），以及channel的开关状态（是否被关闭了）
    return true, !closed
}
```

从`recvchan`中我们得知：

1. 在非阻塞模式下并且channel没有被关闭。如果是非缓冲型channel，等待发送列队 sendq 里没有 goroutine 在等待，又或者是缓冲型channel，但 buf 里没有元素。select将无法从中获得数据，selected == false，这里的`received`也为`false`会是代表channel已经被关闭了？其实不是，那只是在`recv`也就是`<- ch`场景下我们可以这样认为，这也是为什么在上面解释的原因。

   ```go
   // compiler implements
   //
   //    select {
   //    case v = <-c:
   //        ... foo
   //    default:
   //        ... bar
   //    }
   //
   // as
   //
   //    if selectnbrecv(&v, c) {
   //        ... foo
   //    } else {
   //        ... bar
   //    }
   //
   func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
       selected, _ = chanrecv(c, elem, false)
       return
   }

   // compiler implements
   //
   //    select {
   //    case v, ok = <-c:
   //        ... foo
   //    default:
   //        ... bar
   //    }
   //
   // as
   //
   //    if c != nil && selectnbrecv2(&v, &ok, c) {
   //        ... foo
   //    } else {
   //        ... bar
   //    }
   //
   func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
       // TODO(khr): just return 2 values from this function, now that it is in Go.
       selected, *received = chanrecv(c, elem, false)
       return
   }

   // block参数值为false，那么对一个nil chan进行select并不会像接收操作那样panic
   // 如果是一个 nil 的 channel
   if c == nil {
     // 如果不阻塞，直接返回 (false, false)
     if !block {
       return
     }
     // 否则，接收一个 nil 的 channel，goroutine 挂起
     gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
     // 不会执行到这里
     throw("unreachable")
   }

   func TestChannelSelectFromNil(t *testing.T) {
       var ch chan int
       var x int
       var ok bool

       var cnt = 12
       var hit int
       for cnt > 0 {
           select {
           case x = <-ch:
               fmt.Println(x)
           case x, ok = <-ch:
               fmt.Println(x, ok)
           default:
               //fmt.Println("default")
               hit++
           }
           cnt--
       }
       fmt.Println(hit)    // 12
   }
   ```
2. channel已经关闭，并且里面已经没有元素，我们仍然可以从中获得元素（得到的是channel容器元素中类型的零值）
3. 如果等待发送队列中有goroutine存在。如果是非缓冲型channel，那么直接将其拷贝到接收者的栈；如果是缓冲型channel，那么将弹出队首元素，将其添加到队尾。
4. 如果缓冲型channel中有元素，那么对于"val <- ch"这种方式的接收则会通过`typedmemmove`进行拷贝，然后回收环形队列的队首。
5. 如果channel处于阻塞状态并且等待发送队列中不存在goroutine，那么当前go routine将会被挂起，直到有数据被send进channel而被唤醒。

## 发送

和分析`<- ch`一样，我们通过查看汇编代码得知`ch <-`最后调用的是`chansend`函数。

```go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
    // 不能阻塞，直接返回 false，表示未发送成功
        if !block {
            return false
        }
    // 阻塞，当前 goroutine 被挂起
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }

    // 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是：
    // 1. channel 是非缓冲型的，且等待接收队列里没有 goroutine
    // 2. channel 是缓冲型的，但循环数组已经装满了元素
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    lock(&c.lock)

  // 如果channel关闭了，向里面发送数据则会panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

  // 如果等待接收队列里有goroutine，则直接将其拷贝给接收gouroutine
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

  // 对于缓冲型channel，如果还有缓冲空间
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep) //  将数据从ep处拷贝到qp处
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true // 发送成功
    }

  // 如果是非阻塞模式下，则直接提示发送失败
    if !block {
        unlock(&c.lock)
        return false
    }

    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
  // 当前gorutine进入等待发送队列
    c.sendq.enqueue(mysg)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
    KeepAlive(ep)

    // someone woke us up.
  // 被唤醒，有机会发送了
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
    // 被唤醒后，发现channel被关闭了，那么将panic
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil    // 解除channel绑定信息
    releaseSudog(mysg)
    return true
}
```

发送数据会有以下情况：

1. 如果channel为nil，在非阻塞模式下会提示发送失败（return false），在阻塞模式下当前goroutine则会被挂起
2. 在非阻塞模式下，如果发送数据给非缓冲型channel或者是元素已满的缓冲型channel，则会提示发送失败。
3. 如果当前channel已经被关闭，当尝试向其发送数据时会panic
4. 如果等待接收队列里有goroutine，直接将要发送的数据拷贝到接收goroutine。（当等待接收队列里有goroutine的时候说明，该channel是非缓冲型channel或者是元素为空的缓冲型channel）
5. 如果是缓冲型channel并且还有缓冲空间，那么将数据拷贝到buf数组中（环形队列），提示发送成功
6. 如果是缓冲型channel并且已经满了，那么将被挂起，直到被唤醒。

## 关闭

关闭channel，会执行函数`closechan`

```go
func closechan(c *hchan) {
  // 关闭一个nil channel，panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
  // 关闭一个非打开状态的channel，panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }

  // 修改channel状态为 已关闭
    c.closed = 1

    var glist gList

    // release all readers
  // 将 channel 所有等待接收队列的里 sudog 释放
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // release all writers (they will panic)
  // 将 channel 等待发送队列里的 sudog 释放
    // 如果存在，这些 goroutine 将会 panic
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
  // 将goroutine唤醒
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}
```

从代码得知：

1. 如果我们关闭一个`nil channel`会panic
2. 如果我们关闭一个非打开状态的channel，会panic
3. 成功关闭一个channel的话。如果等待接收队列里还有goroutine那么将会被释放；如果等待发送队列里还有goroutine将会被释放，只是这些goroutine都是因为panic而异常退出的。

## channel异常总结

![channel异常总结](/files/-MH6wbta2vW458fRSMng)

**关闭已经关闭的`channel`也会引发`panic`**

## channel的应用

### 停止信号

```go
// 摘自go时间轮实现，通过stop channel传递stop信号
// Stop 停止时间轮
func (tw *wheel) Stop() {
    tw.stop <- struct{}{}
}

// Scheduler 调度
func (tw *WheelWithHeap) scheduler() {
    for {
        select {
        case <-tw.ticker.C:
            tw.handleTick() // 处理延时任务
        case aTask := <-tw.taskChannel.add:
            tw.registerTask(aTask) //  注册延时任务
        case rTask := <-tw.taskChannel.remove:
            tw.collectTask(rTask) // 回收延时任务
        case <-tw.stop:
            tw.exited = true
            tw.ticker.Stop()
            return
        }
    }
}
```

### 任务定时

延时任务

```go
select {
    case <-time.After(100 * time.Millisecond):
    case <-s.stopc:
        return false
}
```

定期任务

```go
func worker() {
    ticker := time.Tick(1 * time.Second)
    for {
        select {
        case <- ticker:
            // 执行定时任务
            fmt.Println("执行 1s 定时任务")
        }
    }
}
```

### 解耦生产方和消费方

```go
func TestChannelUseWithWorker(t *testing.T) {
    var ch = make(chan int, 100)
    wg.Add(10)
    for i := 0; i < 10; i++ {
        ch <- i
    }
    go worker(ch)
    wg.Wait()
}

func worker(ch <-chan int) {
    // 启动 5 个工作协程
    for i := 0; i < 5; i++ {
        go func(id int) {
            for {
                select {
                case task := <-ch:
                    fmt.Printf("finish task: %d by worker %d\n", task, id)
                    time.Sleep(time.Second)
                    wg.Done()
                default:
                }
            }
        }(i)
    }
}
```

### 控制并发

```go
// 参考https://github.com/1005281342/test_tools/blob/master/survey/func.go
func Func2(f func(interface{}), cnt int) func(int) {
    return func(n int) {
        ch := make(chan int, cnt)
        for i, v := range RandShuffle(n) {
            wg.Add(1)
            go func(a int) {
        ch <- i
                defer func() {
          // 避免f(a)异常退出导致阻塞
                    <-ch
          wg.Done()
                }()
                f(a)
            }(v)
        }

        for i := 0; i < cnt; i++ {
            ch <- i
        }
        wg.Wait()
    }
}
```

## 内存泄露

“通道可能会引发goroutine leak，确切地说，是指goroutine处于发送或接收阻塞状态，但一直未被唤醒。垃圾回收器并不收集此类资源，导致它们会在等待队列里长久休眠，形成资源泄漏。”

摘录来自: 雨痕. Go语言学习笔记。

```go
func test() { 
   c:=make(chan int) 

   for i:=0;i<10;i++ { 
       go func() { 
            <-c
        }() 
    } 
} 

func main() { 
   test() 

   for{ 
       time.Sleep(time.Second) 
       runtime.GC()          // 强制垃圾回收 
    } 
}
// $go build-o test
// $GODEBUG="gctrace=1,schedtrace=1000,scheddetail=1" ./test
// 从监控结果可以看到大量goroutine一直处于chan receive状态，无法结束
```

## 参考

[深度解密Go语言之channel](https://qcrao.com/2019/07/22/dive-into-go-channel/)

[Golang 源码导读 —— channel](https://studygolang.com/articles/21586)

[我可能并不会使用golang chan](https://juejin.im/post/6844904164951080973)

[Go语言基础之并发](https://www.liwenzhou.com/posts/Go/14_concurrence/)

雨痕. Go语言学习笔记


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://1005281342.gitbook.io/code-porter/yu-fa-ji-chu/channel/0_channel.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
