go channel
1 基本操作
Tip
- 无缓冲:假设快递, 在没有蜂巢的情况下, 快递员带着包裹来了, 你人不在, GG, 阻塞了,等待
- 有缓冲:有蜂巢后, 包裹到了, 快递员直接将包裹放到蜂巢就行, 但是如果蜂巢满了, GG,阻塞了,等待
- 当然如果 客户在的话, 那么直接交给客户就是了
- 上面2点已经说明了,什么情况下是不阻塞的
func TestChan(t *testing.T) {
//定义
var intChan chan int
// 指针,引用类型
intChan = make(chan int, 3)
fmt.Printf("intChar的值=%v intChan本身的地址=%p\n", intChan, &intChan)
go func() {
// 元素个数为空时读 会阻塞等待
fmt.Println("读取管道等待中...")
num2 := <-intChan
fmt.Println("读取完毕...", num2)
}()
time.Sleep(2 * time.Second)
// 向管道写入数据
intChan <- 1
intChan <- 2
intChan <- 3
// 打印管道的长度和容量
// 容量是不变的,长度根据管道里面存放着多少个数据
fmt.Printf("intChan len=%d cap=%d\n", len(intChan), cap(intChan))
intChan <- 4
go func() {
// 数据写入,满了,再写 会阻塞等待
fmt.Println("写入管道等待中...")
intChan <- 5
fmt.Println("写入管道完毕...")
}()
time.Sleep(3 * time.Second)
//读取管道数据
// 无变量接收
<-intChan
// 有变量接收
num, ok := <-intChan
fmt.Println("读取:",ok, num)
time.Sleep(5 * time.Second)
// 关闭管道后,不能写入数据,但是可以读取
close(intChan)
num = <-intChan
fmt.Println(num)
num = <-intChan
fmt.Println(num)
// 已经没有元素了, 但是还是可以读取操作, 值是0
num, ok = <-intChan
fmt.Println(num,ok) // 0 false
}func main() {
ch := make(chan int)
// 启动一个goroutine将数据发送到通道中
go func() {
for i := 1; i <= 5; i++ {
ch <- i // 发送数据到通道
time.Sleep(time.Second)
}
close(ch) // 关闭通道
}()
// 使用for range迭代读取通道中的值
// 如果没有 close 会超时退出
for num := range ch {
fmt.Println(num)
}
}- select
2 底层原理
2.1 hchan数据结构
思考 (以有缓冲为例)
- channel 可能占用的内存空间比较大, 那么将实际数据存放在堆上
- 操作上FIFO,那么 使用一个数组来存数据 ,环形的
- 需要一个索引指出下一个读的下标位置
- 需要一个索引指出下一个写的下标位置
- 需要存储的元素的类型信息
- 线程安全的, 那么肯定有锁这样的东西
- 缓冲区为空时,在读这个chan的goroutine会阻塞, 当chan 有数据时,会开始读取数据,所以chan需要知道谁在等待读自己
- 当缓冲区满了后,往chan写数据的goroutine 就会阻塞,等空出位置了, 之前阻塞的goroutine 会重新开始写数据,所以需要知道谁在等待写该chan
- close 操作后, 不能写入数据了, 需要记录 是否close的信息
2.2 recvq和sendq何时有数据
Tip
- 有缓冲chan,缓冲区满了,还往里写的goroutine 会加入 sendq 队列
- 有缓冲chan,缓冲区空的,从ch里读的goroutine 会加入 recvq 队列
- 无缓冲chan,无人接收时,往里写的goroutine 会加入 sendq 队列
- 无缓冲chan,无人写入时,从ch里读的goroutine 会加入 recvq 队列
用于检验sendq和recvq的demo
package test
import (
"bytes"
"fmt"
"runtime"
"strconv"
"testing"
"time"
"unsafe"
)
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *int64
sendx uint
recvx uint
recvq waitq
sendq waitq
}
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
g *g
next *sudog
prev *sudog
}
type g struct {
// 通过源码g 的结构, 前面字段占用的字节.
// 版本不一样可能 这里也会不一样.
tmp [19]uint64
goid uint64
}
// 获取 goroutine 的id
func GetGID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = bytes.TrimPrefix(b, []byte("goroutine "))
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
func TestMain(t *testing.T) {
fmt.Println("主 goroutine:", GetGID())
// ch := make(chan int) 无缓冲验证
ch := make(chan int, 5)
chAddr := *(*uintptr)(unsafe.Pointer(&ch))
go func() {
fmt.Println("这个goroutine A的 id:", GetGID())
for i := 0; i < 10; i++ {
time.Sleep(50 * time.Millisecond)
ch <- i
}
}()
go func() {
time.Sleep(1 * time.Second)
fmt.Println("这个goroutine B的 id:", GetGID())
for i := 11; i < 20; i++ {
time.Sleep(50 * time.Millisecond)
ch <- i
}
}()
go func() {
for {
hchanData := *(*hchan)(unsafe.Pointer(chAddr))
fmt.Println(hchanData)
if hchanData.sendq.first != nil {
fmt.Println("hchan结构里sendq存的 first goroutine id: ", hchanData.sendq.first.g.goid)
if hchanData.sendq.first.next != nil {
fmt.Println("hchan结构里sendq存的first goroutine id ->next: ", hchanData.sendq.first.next.g.goid)
}
}
if hchanData.sendq.last != nil {
fmt.Println("hchan结构里sendq存的last goroutine id: ", hchanData.sendq.last.g.goid)
if hchanData.sendq.last.prev != nil {
fmt.Println("hchan结构里sendq存的last goroutine id ->prev: ", hchanData.sendq.last.prev.g.goid)
}
}
// for select case 处读取chan, 到没有元素了,就会在recvq添加读取它goroutine
if hchanData.recvq.first != nil {
fmt.Println("当前ch 的元素个数:", len(ch))
// 主协程 最后一直等待读取 chan
fmt.Println("hchan结构里 recvq 存的first goroutine id: ", hchanData.recvq.first.g.goid)
close(ch)
}
time.Sleep(1 * time.Second)
}
}()
time.Sleep(5 * time.Second)
Loop:
for {
select {
case c, ok := <-ch:
if !ok {
break Loop
}
fmt.Println(c)
}
time.Sleep(500 * time.Millisecond)
}
}2.3 写 runtime.chansend1
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
//chan是nil的情况, 直接 阻塞
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// ...
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 准备开始发送数据,先加锁
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel")) // (1)
}
// 如果这个时候有接收者在等待数据,那么直接发给它,不用写入缓冲区
// 接收者在等待意味着缓冲区里本身是没数据的
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 如果缓冲区有空余的情况
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// 走到这里的话, 意味着缓冲没空余且无接收者,这个时候就是 添加到sendq
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}- 测试代码
go // 执行会报错 panic: send on closed channel func TestSendToCloseChan(t *testing.T) { ch := make(chan int, 2) ch <- 1 close(ch) ch <- 2 }