go channel

Created

2024-10-28 13:28:46

Updated

2024-10-28 13:28:48

1 基本操作

Tip
  • 无缓冲:假设快递, 在没有蜂巢的情况下, 快递员带着包裹来了, 你人不在, GG, 阻塞了,等待
  • 有缓冲:有蜂巢后, 包裹到了, 快递员直接将包裹放到蜂巢就行, 但是如果蜂巢满了, GG,阻塞了,等待
    • 当然如果 客户在的话, 那么直接交给客户就是了
  • 上面2点已经说明了,什么情况下是不阻塞的
func TestChan(t *testing.T) {
    //定义
    var intChan chan int
    // 指针,引用类型
    intChan = make(chan int, 3)
    fmt.Printf("intChar的值=%v intChan本身的地址=%p\n", intChan, &intChan)
    go func() {
        // 元素个数为空时读 会阻塞等待
        fmt.Println("读取管道等待中...")
        num2 := <-intChan
        fmt.Println("读取完毕...", num2)
    }()
    time.Sleep(2 * time.Second)
    // 向管道写入数据
    intChan <- 1
    intChan <- 2
    intChan <- 3
    // 打印管道的长度和容量
    // 容量是不变的,长度根据管道里面存放着多少个数据
    fmt.Printf("intChan len=%d cap=%d\n", len(intChan), cap(intChan))
    intChan <- 4
    go func() {
        // 数据写入,满了,再写 会阻塞等待
        fmt.Println("写入管道等待中...")
        intChan <- 5
        fmt.Println("写入管道完毕...")
    }()

    time.Sleep(3 * time.Second)
    //读取管道数据
    // 无变量接收
    <-intChan
    // 有变量接收
    num, ok := <-intChan
    fmt.Println("读取:",ok, num)

    time.Sleep(5 * time.Second)
    // 关闭管道后,不能写入数据,但是可以读取
    close(intChan)

    num = <-intChan
    fmt.Println(num)
    num = <-intChan
    fmt.Println(num)
    // 已经没有元素了, 但是还是可以读取操作, 值是0
    num, ok = <-intChan
    fmt.Println(num,ok) // 0 false
}
func main() {
    // 无缓冲
    ch := make(chan int)
    // 没有消费者, 运行会直接报错
    ch <- 1
}
func main() {
    ch := make(chan int)
    // 启动一个goroutine将数据发送到通道中
    go func() {
        for i := 1; i <= 5; i++ {
            ch <- i // 发送数据到通道
            time.Sleep(time.Second)

        }
        close(ch) // 关闭通道
    }()

    // 使用for range迭代读取通道中的值
    // 如果没有 close 会超时退出
    for num := range ch {
        fmt.Println(num)
    }
}
  • select

2 底层原理

2.1 hchan数据结构

思考 (以有缓冲为例)
  1. channel 可能占用的内存空间比较大, 那么将实际数据存放在堆上
  2. 操作上FIFO,那么 使用一个数组来存数据 ,环形的
    1. 需要一个索引指出下一个读的下标位置
    2. 需要一个索引指出下一个写的下标位置
  3. 需要存储的元素的类型信息
  4. 线程安全的, 那么肯定有锁这样的东西
  5. 缓冲区为空时,在读这个chan的goroutine会阻塞, 当chan 有数据时,会开始读取数据,所以chan需要知道谁在等待读自己
  6. 当缓冲区满了后,往chan写数据的goroutine 就会阻塞,等空出位置了, 之前阻塞的goroutine 会重新开始写数据,所以需要知道谁在等待写该chan
  7. close 操作后, 不能写入数据了, 需要记录 是否close的信息
查看汇编知道 runtime.makechan
type hchan struct {
    qcount   uint           // 当前存放的元素个数
    dataqsiz uint           // 数组的容量
    buf      unsafe.Pointer // 数组的地址
    elemsize uint16 
    closed   uint32  // 
    elemtype *_type // element type
    sendx    uint   // 下一个写的下标
    recvx    uint   // 下一个读的下标
    recvq    waitq  // 读等待队列
    sendq    waitq  // 写等待队列
    lock mutex  // 锁
}

2.2 recvq和sendq何时有数据

Tip
  1. 有缓冲chan,缓冲区满了,还往里写的goroutine 会加入 sendq 队列
  2. 有缓冲chan,缓冲区空的,从ch里读的goroutine 会加入 recvq 队列
  3. 无缓冲chan,无人接收时,往里写的goroutine 会加入 sendq 队列
  4. 无缓冲chan,无人写入时,从ch里读的goroutine 会加入 recvq 队列
用于检验sendq和recvq的demo
package test

import (
    "bytes"
    "fmt"
    "runtime"
    "strconv"
    "testing"
    "time"
    "unsafe"
)

type hchan struct {
    qcount   uint
    dataqsiz uint
    buf      unsafe.Pointer
    elemsize uint16
    closed   uint32
    elemtype *int64
    sendx    uint 
    recvx    uint
    recvq    waitq 
    sendq    waitq
}
type waitq struct {
    first *sudog
    last  *sudog
}
type sudog struct {
    g    *g
    next *sudog
    prev *sudog
}
type g struct {
    // 通过源码g 的结构, 前面字段占用的字节.
    // 版本不一样可能 这里也会不一样.
    tmp  [19]uint64
    goid uint64
}

// 获取 goroutine 的id
func GetGID() uint64 {
    b := make([]byte, 64)
    b = b[:runtime.Stack(b, false)]
    b = bytes.TrimPrefix(b, []byte("goroutine "))
    b = b[:bytes.IndexByte(b, ' ')]
    n, _ := strconv.ParseUint(string(b), 10, 64)
    return n
}

func TestMain(t *testing.T) {
    fmt.Println("主 goroutine:", GetGID())
    // ch := make(chan int) 无缓冲验证
    ch := make(chan int, 5)
    chAddr := *(*uintptr)(unsafe.Pointer(&ch))
    go func() {

        fmt.Println("这个goroutine A的 id:", GetGID())
        for i := 0; i < 10; i++ {
            time.Sleep(50 * time.Millisecond)
            ch <- i
        }

    }()
    go func() {

        time.Sleep(1 * time.Second)
        fmt.Println("这个goroutine B的 id:", GetGID())
        for i := 11; i < 20; i++ {
            time.Sleep(50 * time.Millisecond)
            ch <- i
        }
    }()
    go func() {

        for {
            hchanData := *(*hchan)(unsafe.Pointer(chAddr))
            fmt.Println(hchanData)
            if hchanData.sendq.first != nil {
                fmt.Println("hchan结构里sendq存的 first goroutine id: ", hchanData.sendq.first.g.goid)
                if hchanData.sendq.first.next != nil {
                    fmt.Println("hchan结构里sendq存的first goroutine id ->next: ", hchanData.sendq.first.next.g.goid)
                }
            }
            if hchanData.sendq.last != nil {
                fmt.Println("hchan结构里sendq存的last goroutine id: ", hchanData.sendq.last.g.goid)
                if hchanData.sendq.last.prev != nil {
                    fmt.Println("hchan结构里sendq存的last goroutine id ->prev: ", hchanData.sendq.last.prev.g.goid)
                }

            }
            // for select case 处读取chan, 到没有元素了,就会在recvq添加读取它goroutine
            if hchanData.recvq.first != nil {
                fmt.Println("当前ch 的元素个数:", len(ch))
                // 主协程 最后一直等待读取 chan
                fmt.Println("hchan结构里 recvq 存的first goroutine id: ", hchanData.recvq.first.g.goid)
                close(ch)
            }
            time.Sleep(1 * time.Second)
        }
    }()

    time.Sleep(5 * time.Second)

Loop:
    for {
        select {
        case c, ok := <-ch:
            if !ok {
                break Loop
            }
            fmt.Println(c)
        }
        time.Sleep(500 * time.Millisecond)
    }
}

2.3 写 runtime.chansend1

func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }

        //chan是nil的情况, 直接 阻塞
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // ...

    if !block && c.closed == 0 && full(c) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
    // 准备开始发送数据,先加锁
    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))  // (1)
    }
    // 如果这个时候有接收者在等待数据,那么直接发给它,不用写入缓冲区
    // 接收者在等待意味着缓冲区里本身是没数据的
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    // 如果缓冲区有空余的情况
    if c.qcount < c.dataqsiz {
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            racenotify(c, c.sendx, nil)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    if !block {
        unlock(&c.lock)
        return false
    }
    // 走到这里的话, 意味着缓冲没空余且无接收者,这个时候就是 添加到sendq
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    KeepAlive(ep)

    // someone woke us up.
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    if closed {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    return true
}
  1. 测试代码 go // 执行会报错 panic: send on closed channel func TestSendToCloseChan(t *testing.T) { ch := make(chan int, 2) ch <- 1 close(ch) ch <- 2 }

2.4 读 runtime.chanrecv1

Back to top