蹒跚学Go第七天-并发编程

并发

背景

因为Go语言从语言层面支持了并行,所以有人把Go语言比作21世纪的C语言。通常程序会被编写为一个顺序执行并完成一个独立任务的代码,因为这种类型的程序很容易写,也很容易维护。不过也有一些情况下,并行执行多个任务会有更大的好处。

简介

并发和并行是两个不同的概念

  • 并行意味着程序在任意时刻都是同时运行的

  • 并发意味着程序在单位时间内是同时运行的。

图解

解释

  • 并行就是在任一粒度的时间内都具备同时执行的能力最简单的并行就是多机,多台机器并行处理。单机多核(SMP对称多处理结构)表面上看是并行的,但由于是共享内存,以及线程间的同步等,不可能完全做到并行。

  • 并发是在规定的时间内多个请求都得到执行和处理,强调的是给外界的感觉,实际上内部可能是分时操作的。并发重在避免阻塞,使程序不会因为一个阻塞而停止处理。并发典型的应用场景:分时操作系统就是一种并发设计(忽略多核CPU)。将Cpu划分成多个短的时间分片,分配给多个应用使用,即在某一时间点上实际cpu提供个一个应用,只是对我们人所能感知的时间维度来说无感。

三个程

  • 进程(Process):一个正在独立执行的程序,占用系统的资源分配

进程是一个程序在一个数据集中的一次动态执行过程,可以简单理解为“正在执行的程序”,它是CPU资源分配和调度的独立单位。
进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。进程的局限是创建、撤销和切换的开销比较大

  • 线程(Thread):一个进程中的一条执行的路径/单元(指该进程中的一个执行中的操作功能)

线程是在进程之后发展出来的概念。线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。一个进程可以包含多个线程。
线程的优点是减小了程序并发执行时的开销,提高了操作系统的并发性能,缺点是线程没有自己的系统资源,只拥有在运行时必不可少的资源,但同一进程的各线程可以共享进程所拥有的系统资源,如果把进程比作一个车间,那么线程就好比是车间里面的工人。不过对于某些独占性资源存在锁机制,处理不当可能会产生“死锁”。

  • 协程(Coroutine):轻量级的用户线程,占用的资源比较少

协程是一种用户态的轻量级线程,又称微线程,英文名Coroutine,协程的调度完全由用户控制。人们通常将协程和子程序(函数)比较着理解。子程序调用总是一个入口,一次返回,一旦退出即完成了子程序的执行。与传统的系统级线程和进程相比,协程的最大优势在于其“轻量级”,可以轻松创建上百万个而不会导致系统资源衰竭,而线程和进程通常最多也不能超过1万的。这也是协程也叫轻量级线程的原因。

  • 协程的特点在于是一个线程执行,与多线程相比,其优势体现在:协程的执行效率极高。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

goroutine

go中使用Goroutines来实现并发concurrently。Goroutines是与其他函数或方法同时运行的函数或方法。
Goroutines可以被认为是轻量级的线程。与线程相比,创建Goroutine的成本很小,它就是一段代码,一个函数入口。以及在堆上为其分配的一个堆栈(初始大小为4K,会随着程序的执行自动增长删除)。因此它非常廉价,Go应用程序可以并发运行数千个Goroutines。

  • 实现并发:goroutine轻量级别的协程

  • 语法简单轻便:go(关键字) 函数

  • go在运行代码时候首先创建的goroutine为main(),主main()一旦执行结束就会销毁进程不等待正在子goroutine运行状态是否结束
func main(){
    //调用goroutine执行函数
    go hello()
    fmt.Println("main")
    //使用睡眠是保证CPU能够运行完go hello()才结束主程序
    time.Sleep(1*time.Second) 
    //不使用情况下因为CPU是随机去执行,所以绝大多数情况下我们看不见go hello()的输出情况,main()就已经消亡
}
func hello(){
    fmt.Println("goroutine中执行的")
}
输出结果:
main
goroutine中执行的

主goroutine工作流程

封装main函数的goroutine称为主goroutine。
主goroutine所做的事情并不是执行main函数那么简单。它首先要做的是:设定每一个goroutine所能申请的栈空间的最大尺寸。在32位的计算机系统中此最大尺寸为250MB,而在64位的计算机系统中此尺寸为1GB。如果有某个goroutine的栈空间尺寸大于这个限制,那么运行时系统就会引发一个栈溢出(stack overflow)的运行时恐慌。随机,这个go程序的运行也会终止。

  • 1.创建一个特殊的defer语句,用于在主goroutine退出时做必要的善后处理。因为主goroutine也可能非正常的结束

  • 2.启动专用于在后台清扫内存垃圾的goroutine,bin个设置GC可用的标识

  • 3.执行mian包中的init函数(初始化信息如内核设置等)

  • 4.执行main函数:执行完main函数后,它还会检查主goroutine是否引发了运行时恐慌,并进行必要的处理。最后主goroutine会结束自己以及当前进程的运行。

init函数

  • init()函数同main() -> 特殊函数,由我们的系统自动的调用执行 -> main goroutine

  • go语言中init()如果不定义核数默认为1

  • runtime.GOMAXPROCS() //go程序最大执行的核数[1,256]

func init(){
    //可以设置当前cpu的内容
    fmt.Println("当前cpu的核数:",runtime.NumCPU())
runtime.GOMAXPROCS(runtime.NumCPU())
}

图解Goroutine实质

  • G: 表示goroutine,存储了goroutine的执行stack信息、goroutine状态以及goroutine的任务函数等;

  • P: 表示逻辑processor,P的数量决定了系统内最大可并行的G的数量(前提:系统的物理cpu核数>=P的数量);P的最大作用还是其拥有的各种G对象队列、链表、一些cache和状态。

  • M: M代表着真正的执行计算资源。在绑定有效的p后,进入schedule循环;而schedule循环的机制大致是从各种队列、p的本地队列中获取G,切换到G的执行栈上并执行G的函数,调用goexit做清理工作并回到m,如此反复。M并不保留G状态,这是G可以跨M调度的基础。

  • P是一个“逻辑Proccessor”,每个G要想真正运行起来,首先需要被分配一个P(进入到P的local runq中,这里暂忽略global runq那个环节)。对于G来说,P就是运行它的“CPU”,可以说:G的眼里只有P。但从Go scheduler视角来看,真正的“CPU”是M,只有将P和M绑定才能让P的runq中G得以真实运行起来。

G-P-M模型的实现算是Go scheduler的一大进步,但Scheduler仍然有一个头疼的问题,那就是不支持抢占式调度,导致一旦某个G中出现死循环或永久循环的代码逻辑,那么G将永久占用分配给它的P和M,位于同一个P中的其他G将得不到调度,出现“饿死”的情况。更为严重的是,当只有一个P时(GOMAXPROCS=1)时,整个Go程序中的其他G都将“饿死”。那怎么办呢?在GO1.2中,他增加了一个监控,Go程序启动时,runtime会去启动一个名为sysmon的m(一般称为监控线程),该m无需绑定p即可运行,该m在整个Go程序的运行过程中至关重要:向长时间运行的G任务发出抢占调度,收回因syscall长时间阻塞的P等等

go语言的并发

  • 系统自动创建并且启动主goroutine执行对象main()

  • 用于自己创建并且启动子goroutine执行对应函数

  • 子goroutine中执行的函数往往没有返回值,如果有也会被舍弃

goroutine特点

  • go的执行是非阻塞的,不会等待。

  • go后面的函数的返回值会被忽略。

  • 调度器不能保证多个goroutine的执行次序。

  • 没有父子goroutine的概念,所有的goroutine是平等地被调度和执行的。

  • Go程序在执行时会单独为main函数创建一个goroutine,遇到其他go关键字时再去创建其他的goroutine。

  • Go没有暴露 goroutine id给用户,所以不能在一个goroutine里面显式地操作另一个goroutine,不过runtime包提供了一些函数访问和设置goroutine的相关信息。

练习

  • 并发打印1000数字,1千字母
func main() {
    //并发的程序运行的结果每一次执行都不一定相同
    go num()
    go letter()
    time.Sleep(1*time.Second)
    fmt.Println("mian() is over")
}

func num() {
    for i := 0;i <= 1000; i++{
        fmt.Println("子goroutine中的i:",i)
        time.Sleep(1)   //加睡眠可以比较好的看出执行的结果
    }
}

func letter() {
    for i := 0; i <= 1000; i++ {
        fmt.Printf("\t子goroutine中的字母:%d,%c\n",i,i)
        time.Sleep(1)
    }
}
取出部分执行结果:
    子goroutine中的字母:995,ϣ
子goroutine中的i: 995
子goroutine中的i: 996
    子goroutine中的字母:996,Ϥ
mian() is over  //为何main不是最后结束呢?因为本程序产生了3个goroutine,在一个时间分片中cpu随机执行某个goroutine所以
子goroutine中的i: 997
  • 定义250毫秒打印一次数字,450毫秒打印一次字母
func main() {
    //并发的程序运行的结果每一次执行都不一定相同
    go printnum()
    go printLetter()
    time.Sleep(3*time.Second)
    fmt.Println("mian() is over")
}

func printnum() {
    for i := 0;i <= 1000; i++{
        fmt.Print(i,"\t")
        time.Sleep(250*time.Millisecond)
    }
}
func printLetter() {
    for i:=65;i<=70;i++{
        fmt.Printf("%c\t",i)
        time.Sleep(400*time.Millisecond)
    }
}
输出结果:
0   A   1   B   2   3   C   4   D   5   6   E   7   8   F   9   10  11  12  mian() is over  //很有动态的感觉

Go并发原理

  • https://segmentfault.com/a/1190000018150987#articleHeader1

runtime

Gosched

runtime.Gosched() 用于让出CPU时间片,让出当前goroutine的执行权限,调度器安排其他等待的任务运行,并在下次再获得cpu时间轮片的时候,从该出让cpu的位置恢复执行。

func main() {
    //创建一个goroutine
    go func(s string) {
        for i := 0; i < 2; i++ {
            fmt.Println(s)
        }
    }("world")

    for i := 0; i < 2; i++ {
        runtime.Gosched()  //import "runtime" 包
        /*
            屏蔽runtime.Gosched()运行结果如下:
                hello
                hello

            没有runtime.Gosched()运行结果如下:
                world
                world
                hello
                hello
        */
        fmt.Println("hello")
    }
}
  • 主协程进入main()函数,进行代码的执行。当执行到go func()匿名函数时,创建一个新的协程,开始执行匿名函数中的代码,主协程继续向下执行,执行到runtime.Gosched( )时会暂停向下执行,直到其它协程执行完后,再回到该位置,主协程继续向下执行

Goexit

调用 runtime.Goexit() 将立即终止当前 goroutine 执⾏,调度器确保所有已注册 defer延迟调用被执行

func main() {
    go func() {
        defer fmt.Println("第一个defer")
        func(){
            defer fmt.Println("第二个defer")
            runtime.Goexit()
            fmt.Println("执行成功B!")
        }()
        fmt.Println("执行成功A!")
    }()
    time.Sleep(1*time.Second)
}
执行结果:
第二个defer
第一个defer

GOMAXPROCS

  • 调用 runtime.GOMAXPROCS() 用来设置可以并行计算的CPU核数的最大值,并返回之前的值。
func main() {
//n := runtime.GOMAXPROCS(1)    // 第一次 测试
//打印结果:111111111111111111110000000000000000000011111...

n := runtime.GOMAXPROCS(2)         // 第二次 测试
//打印结果:010101010101010101011001100101011010010100110...
    fmt.Printf("n = %d\n", n)

    for {
        go fmt.Print(0)
        fmt.Print(1)
    }
}
  • 在第一次执行runtime.GOMAXPROCS(1) 时,最多同时只能有一个goroutine被执行。所以会打印很多1。过了一段时间后,GO调度器会将其置为休眠,并唤醒另一个goroutine,这时候就开始打印很多0了,在打印的时候,goroutine是被调度到操作系统线程上的。在第二次执行runtime.GOMAXPROCS(2) 时, 我们使用了两个CPU,所以两个goroutine可以一起被执行,以同样的频率交替打印0和1。

channels

通道可以被认为是Goroutines通信的管道。类似于管道中的水从一端到另一端的流动,数据可以从一端发送到另一端,通过通道接收。默认情况下,channel接收和发送数据都是阻塞的,除非另一端已经准备好,这样就使得goroutine同步变的更加的简单,而不需要显式的lock

  • channel <- value //发送value到channel

  • <-channel //接收并将其丢弃

  • x := <-channel //从channel中接收数据,并赋值给x

  • x, ok := <-channel //功能同上,同时检查通道是否已关闭或者是否为空

  • 案例

func main() {
    var ch1 chan int
    fmt.Println("(1)",ch1)  //nil
    ch1 = make(chan int)
    ch2 := make(chan bool)
    fmt.Println("(2)通道的内存地址:",ch1,)
    go func() {
        fmt.Println("\tfun(1) ----子goroutine----")
        data := <- ch1  //从通道中读取数据
        //time.sleep(1*time.Millsecond)触发data还未取出就已经结束main
        time.Sleep(1*time.Millisecond)
        fmt.Println("\tfun(2) 子goroutine从通道中读取到的数据是:",data)
        ch2 <- true //向通道中写数据表示结束
    }()
    //cpu调度资源被ch1抢占后可能会导致ch1通道没有取出数据main就结束了
    ch1 <- 100  //阻塞式mian goroutine向通道中写入数据
    <- ch2  //保证了程序必须取到值才解除阻塞,保证main不会迫使程序提前结束
    fmt.Println("(3) main() is over")
}

输出值:
(1) <nil>
(2)通道的内存地址: 0xc04203c060
    fun(1) ----子goroutine----
    fun(2) 子goroutine从通道中读取到的数据是: 100
(3) main() is over

channel的阻塞

Go的哲学是“不要通过共享内存来通信,而是通过通信来共享内存”,通道是Go通过通信来共享内存的载体。
通道是有类型的,可以简单地把它理解为有类型的管道。声明一个简单的通道语句是chan datarype,但是简单声明一个通道变量没有任何意义,a并没有初始化,其值是nil。Go语言提供一个内置函数make来创建通道。

  • 概念: 专门用于goroutine之间传递数据的类似通信的消息队列

  • 语法:数据类型make()也是引用类型的数据,创建通道要关联一个相关的类型,指定通道所能存储的数据类型,nil chan和map都是不能使用的。

  • 操作:goroutine可以从channel中读取数据,另外一个goroutine从中写入数据,操作符为 <-,对channel的读写都是堵塞的(程序是暂停的不允许执行直到解除堵塞)。

读数据: data:= <- chan
取数据: chan <- data

阻塞的3种方式

  • (1)向未初始化的通道写数据或读数据都会导致当前goroutine的永久阻塞。

  • (2)向缓冲区已满的通道写入数据会导致goroutine阻塞。

  • (3)通道中没有数据,读取该通道会导致goroutine阻塞。

两种通道

通道分为无缓冲的通道和有缓冲的通道,Go提供内置函数len和cap,无缓冲的通道的len和cap都是0,有缓冲的通道的len代表没有被读取的元素数,cap代表整个通道的容量。无缓冲的通道既可以用于通信,也可以用于两个goroutine的同步,有缓冲的通道主要用于通信。

无缓冲通道

无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送goroutine和接收goroutine同时准备好,才能完成发送和接收操作。否则,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在

  • 阻塞:由于某种原因数据没有到达,当前协程(线程)持续处于等待状态,直到条件满足,才接触阻塞。

  • 同步:在两个或多个协程(线程)间,保持数据内容一致性的机制。

有阻塞通道

有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个数据值的通道。
这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也不同。
只有通道中没有要接收的值时,接收动作才会阻塞。
只有通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。
这导致有缓冲的通道和无缓冲的通道之间的一个很大的不同:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的通道没有这种保证。

  • 拥有缓冲区的channel可以进行数据的存储,存储至容量的上限后->阻塞

  • 具备异步通讯能力,不需要同时操作缓冲区

//创建一个无缓冲的通道
make(chan type)
//创建有10个缓冲的通道
make(chan type,10)
len(chan):channel中剩余的未读取数据的个数,cap(ch):通道的容量
  • 例子:
func main() {
    a := make(chan int,3)   //存放满3个元素之前不会阻塞
    fmt.Println("len=",len(a),"cap=",cap(a))
    go func() {
        for i:=0;i<8;i++ {
            a <- i
            fmt.Println("\tgoroutine:i=", i,"len=",len(a),"cap=",cap(a))
        }
    }()
    time.Sleep(2*time.Second)
    for i:=0;i<8;i++{
        num := <- a
        fmt.Println("num=",num)
    }
}
输出值:
len= 0 cap= 3
    goroutine:i= 0 len= 1 cap= 3
    goroutine:i= 1 len= 2 cap= 3
    goroutine:i= 2 len= 3 cap= 3
num= 0
num= 1
num= 2
num= 3
    goroutine:i= 3 len= 0 cap= 3
    goroutine:i= 4 len= 0 cap= 3
    goroutine:i= 5 len= 1 cap= 3
    goroutine:i= 6 len= 2 cap= 3
    goroutine:i= 7 len= 3 cap= 3
num= 4
num= 5
num= 6
num= 7
  • 上案例分析,不符合正常的情况原因:io延时,io对整体的影响输出和从通道读写数据是不同步的所以不符合预期

channel的关闭

对端可以判断channel是否已经关闭
    if num,ok := <-ch;ok== true{
    如果对端已经关闭,ok -> flase,num没有数据
    没有关闭,ok -> true,num保存读取到的数据 
    }

可以使用range替代ok:
    for rum := range ch{    //ch不能替换为<-ch
}
  • 练习
func main() {
    /*
    for range:
        数组、切片、map、string、chan
        数组/切片/string --> index,value
        map --> key,value
        chan --> value
     */
     ch1 := make(chan string)
     go senddata(ch1)
     for value:=range ch1{
        fmt.Println("通道中读取的数据:",value)
     }
}

func senddata(ch1 chan string) {
    for i := 1; i < 10; i++ {
        ch1 <- fmt.Sprintln("数据:",i)
    }
    fmt.Println(" --> 写入数据完成!!!")
    close(ch1)
}
  • 3个goroutine实现:一个向通道中写数据,两个争抢读通道的数据
func main() {
    /*
    for range:
        数组、切片、map、string、chan
        数组/切片/string --> index,value
        map --> key,value
        chan --> value
     */
     ch1 := make(chan string)
     go senddata(ch1)
     go sev1(ch1)
     go sev2(ch1)
     time.Sleep(1*time.Second)
}

func senddata(ch1 chan string) {
    for i := 1; i < 10; i++ {
        ch1 <- fmt.Sprintln("数据:",i)
    }
    fmt.Println(" --> 写入数据完成!!!")
    close(ch1)
}
func sev1(ch1 <- chan string) {
    for va := range ch1 {
        fmt.Println("读(1)中读取的数据:",va)
    }
}
func sev2(ch1 <- chan string) {
    for va1 := range ch1 {
        fmt.Println("读(2)中读取的数据:",va1)
    }
}

总结

  • 数据不发送完毕不关闭通道

  • 已经关闭的channel,不能再向其中写数据。否则报错:panic:send on closed channel

  • 写端已经关闭channel可以从中读取数据,无缓冲channel->读0,有缓冲的channel->如果缓冲区有数据读缓冲区数据,读完后再读为0

单向channel

var ch1 chan int    // ch1是一个正常的双向channel
var ch2 chan <- float64 // ch2是单向的写channel数据类型float64
var ch3 <- chan int //ch3是单向的读channel
  • chan <- :表示数据进入管道,要把数据写进管道对于调用着就是输出

  • <- chan :表示数据从管道出来,对于调用者是得到管道数据则为输入

  • 可以将channel隐式转换为单项队列,只收/只发,不能将单向channel转换为普通channel

func main() {
    var ch1 chan int
    var send chan <- int = ch1
    //可以编译,但是一旦执行会出错因为没有读取会阻塞报错 -> fatal error: all goroutines are asleep - deadlock!
    send <- 666
    var rec <- chan int = ch1
    num := <- rec   //只读但是没有数据给它读因为ch1中为空 -> fatal error: all goroutines are asleep - deadlock!
    fmt.Println(num)
}
  • 实际运用
func send(out chan <- int){
    out <- 99
    close(out)
}

func recv(in <- chan int){
    n := <- in
    fmt.Println(n)
}

func main(){
    ch := make(chan  int)
    go func() {
        send(ch)
    }()
    recv(ch)
}
输出结果:
99

定时器

time.Timer

Timer是一个定时器。代表未来的一个单一事件,你可以告诉timer你要等待多长时间。

type Timer struct {
   C <-chan Time
   r runtimeTimer
}
  • 它提供一个channel,在定时时间到达之前,没有数据写入timer.C会一直阻塞。直到定时时间到,向channel写入值,阻塞解除,可以从中读取数据。
func main(){
    /*
        time包对chan的操作
            1.Time:计时器
                NewTimer(duration) -> 取到*Time的对象,struct:字段C <- chan Time
            2.After(duration) <- chan Time
     */
     //创建计时器3秒
     time1 := time.NewTimer(3*time.Second)
     fmt.Printf("%T\n",time1)   //*time.Timer指针
     fmt.Println(time.Now())
     time2 := <- time1.C
     fmt.Println(time2)
     //使用After(),返回<-chan Time;等于Time.C
     ch1 := time.After(5*time.Second)
     fmt.Println("-------------------\n",time.Now())
     time3 := <-ch1
     fmt.Println(time3)
}
输出结果:
*time.Timer
2019-02-20 14:30:45.387 +0800 CST m=+0.002000001
2019-02-20 14:30:48.387 +0800 CST m=+3.002000001
-------------------
 2019-02-20 14:30:48.387 +0800 CST m=+3.002000001
2019-02-20 14:30:53.387 +0800 CST m=+8.002000001

time.Ticker

Ticker是一个周期触发定时的计时器,它会按照一个时间间隔往channel发送系统当前时间,而channel的接收者可以以固定的时间间隔从channel中读取事件

    type Ticker struct {
   C <-chan Time    // The channel on which the ticks are delivered.
   r runtimeTimer
}
  • 实例
func main() {
    //创建定时器,每隔1秒后,定时器就会给channel发送一个事件(当前时间)
    ticker := time.NewTicker(time.Second * 1)

    i := 0
    go func() {
        for { //循环
            <-ticker.C
            i++
            fmt.Println("i = ", i)

            if i == 5 {
                ticker.Stop() //停止定时器
            }
        }
    }() //别忘了()

    //死循环,特地不让main goroutine结束
    for {
    }
}

Select

select 语句类似于switch 语句,但是select会随机执行一个可运行的case。如果没有case可运行,它将阻塞,直到有case可运行。早在UNIX时代,select机制就已经被引入。通过调用select)函数来监控一系列的文件句柄,一旦其中一个文件句柄发生了IVO动作,该select)调用就会被返回。后来该机制也被用于实现高并发的Socket 服务器程序。Go 语言直接在语言级别支持select关键字,用于处理异步I/O问题。

  • 与switch语句相比, select有比较多的限制,其中最大的一条限制就是每个case语句里必须是一个IO操作

执行条件

在一个select语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语句。
如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。
如果没有任意一条语句可以执行(即所有的通道都被阻塞)

  • 如果给出了default语句,那么就会执行default语句,同时程序的执行会从select语句后的语句中恢复。

  • 如果没有default语句,那么select语句将被阻塞,直到至少有一个通信可以进行下去。

func main(){
    /*
        select:分支语句专门用于通道的读写操作的
            select{
            case chan 读/写
            case chan 读/写
            ....
            default:
            }
        执行过程:
            1.select虎随机的执行一个case语句
            2.如果没有case可运行,则走default,没有default则阻塞,等到有case再执行
     */
     ch1 := make(chan int,3)
     ch2 := make(chan int,3)
     go func(ch1 chan int,ch2 chan int) {
        for i:=1;i<=3;i++{
            ch1 <- i+2
            ch2 <- i+5
        }
     }(ch1,ch2)
     go func() {
        for {
            select {
            case data := <- ch1:
                fmt.Println("\t读取(ch1)中的数据",data)
            case data := <- ch2:
                fmt.Println("读取(CH2)中的数据",data)
            }
        }
     }()
    time.Sleep(2*time.Second)
}
输出的值:   //随机执行case得到印证
    读取(ch1)中的数据 3
读取(CH2)中的数据 6
    读取(ch1)中的数据 4
读取(CH2)中的数据 7
读取(CH2)中的数据 8
    读取(ch1)中的数据 5

Go语言没有提供直接的超时处理机制,但可以利用select机制。虽然select机制不是专为超时而设计的,却能很方便地解决超时问题,因为select的特点是只要其中一个case已经完成,程序就会继续往下执行,而不会考虑其他case的情况。

  • 练习
func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    count := 0
    go func() {
        ch1 <- 100
    }()
    go func() {
        ch2 <- 200
    }()

    out:for {   //贴上标签
        time.Sleep(1*time.Second)
        select {
        case data,ok:= <-ch1:
            if !ok {
                fmt.Println("通道关闭",data)
                break out
            }
        case data := <- ch1:
            fmt.Println("ch1中读取数据",data)
        case data1 := <- ch2:
            fmt.Println("ch2中读取数据",data1)
        case <- time.After(2*time.Second):
            fmt.Println("超时!")
            count++
            if count == 3 {
                break out
            }
        }
    }
}
输出的值:
ch2中读取数据 200
ch1中读取数据 100
超时!
超时!
超时!

WaitGroup

goroutine和chan,一个用于并发,另一个用于通信。没有缓冲的通道具有同步的功能,除此之外,sync包也提供了多个goroutine同步的机制,主要是通过WaitGroup(同步等待组)实现的。

数据结构与操作:
type WaitGroup struct{
    //contains filtered or unexported fields
}
//添加等待信号
func(wg*WaitGroup)Add(delta int)
//释放等待信号
func(wg*WaitGroup)Done()
//等待
func(wg*WaitGroup)Wait()
  • WaitGroup用来等待多个goroutine完成,main goroutine 调用Add 设置需要等待 goroutine的数目,每一个goroutine结束时调用Done),Wait)被main用来等待所有的 goroutine完成。
var wg sync.WaitGroup
var tisk int=100    //全局变量
func main() {
    /*
        火车站卖票
     */
    //piao := make(chan int)
    wg.Add(4)
    count :=0
    go product("[窗口一]",count)
    go product("[窗口二]",count)
    go product("[窗口三]",count)
    go product("[窗口四]",count)
    wg.Wait()

}

func product(name string,a int) {
    rand.Seed(time.Now().UnixNano())
    for {
        if tisk >0 {
            tisk --
            a ++
            fmt.Println(name,":",tisk)
            time.Sleep(time.Duration(rand.Intn(100)))
        }else{
            fmt.Println("这里是 ->",name,"售票完毕!over!")
            fmt.Println("这里是 ->",name,"售票共计:",a)
            break
        }
    }
    wg.Done()
}
输出的值:
这里是 -> [窗口四] 售票完毕!over!
这里是 -> [窗口四] 售票共计: 25
这里是 -> [窗口二] 售票完毕!over!
这里是 -> [窗口二] 售票共计: 25
这里是 -> [窗口一] 售票完毕!over!
这里是 -> [窗口一] 售票共计: 25
这里是 -> [窗口三] 售票完毕!over!
这里是 -> [窗口三] 售票共计: 25

互斥锁

每个资源都对应于一个可称为 “互斥锁” 的标记,这个标记用来保证在任意时刻,只能有一个协程(线程)访问该资源。其它的协程只能等待
互斥锁是传统并发编程对共享资源进行访问控制的主要手段,它由标准库sync中的Mutex结构体类型表示。sync.Mutex类型只有两个公开的指针方法,Lock和Unlock。Lock锁定当前的共享资源,Unlock进行解锁。
在使用互斥锁时,一定要注意:对资源操作完成后,一定要解锁,否则会出现流程执行异常,死锁等问题。通常借助defer。锁定后,立即使用defer语句保证互斥锁及时解锁。

var mutex sync.Mutex    //定义互斥锁
func write() {
    mutex.Lock()
    defer mutex.Unlock()
}
  • 练习
var metex sync.Mutex
var a int=1
func print1(name string,a int) {
    //fmt.Println(name,"当前值:",a)
    a= a + 100
    fmt.Println(name,"当前值:",a)
    metex.Lock()
    fmt.Println(name,"枷锁成功!!!")
}

func main() {
    /*
    互斥锁:锁头对象(struck)
        互斥:指定解锁
        有两个指针的方法:
            lock()  -> 上锁
            unlock() -> 解锁
     */

    //metex.Lock()
    //a+=1
    go print1("Goroutine(1)",a)
    go print1("Goroutine(2)",a)
    go print1("Goroutine(2)",a)
    time.Sleep(1*time.Second)
    metex.Unlock()
    fmt.Println(a)
}
输出的值:Goroutine(1) 当前值: 101
Goroutine(1) 枷锁成功!!!
Goroutine(2) 当前值: 101
Goroutine(2) 当前值: 101
1

本质

  • 互斥锁的本质是当一个goroutine访问的时候,其他goroutine都不能访问。这样在资源同步,避免竞争的同时也降低了程序的并发性能。程序由原来的并行执行变成了串行执行

  • 对一个不会变化的数据只做“读”操作的话,是不存在资源竞争的问题的。因为数据是不变的,不管怎么读取,多少goroutine同时读取,都是可以的。不过存在即使合理在一定的特殊场景下使用互斥锁

  • 问题不是出在上,主要是修改,也就是。修改的数据要同步,这样其他goroutine才可以感知到。所以真正的互斥应该是读取和修改、修改和修改之间,读和读是没有互斥操作的必要的

读写锁

  • 共享数据的安全问题:一个共享的数据,被多个goroutine来访问,那么该数据是不安全

  • 读写锁(sync.RWMutex),将读写的动作完全分开

//写锁与写解锁
func (*RWMutex)Lock()
func (*RWMutex)Unlock()
//读锁与读解锁
func (*RWMutex)RLock()
func (*RWMutex)RUnlock()

特性

  • 处于读锁定状态,那么针对它的写锁定操作将永远不会成功,且相应的Goroutine也会被一直阻塞****

  • 读写锁控制下的多个写操作之间都是互斥的,并且写操作与读操作之间也都是互斥的。但是,多个读操作之间不存在互斥关系。

  • 从互斥锁和读写锁的源码可以看出,它们是同源的。读写锁的内部用互斥锁来实现写锁定操作之间的互斥。可以把读写锁看作是互斥锁的一种扩展。

var n int
var wg sync.WaitGroup
var rwm sync.RWMutex
func main() {
    /*
        读写锁:sync.RWMutex
        锁定的规则:
            1.读/写操作都是互斥的
            2.读/写是互斥的
            3.读读不互斥
     */
    wg.Add(10)
    for i:=1;i<=5;i++{
        go write(i)
    }
    for i:=1;i<=5;i++{
        go read(i)
    }
    wg.Wait()
}
func write(i int) {
    defer wg.Done()
    rand.Seed(time.Now().UnixNano())    //随机数
    rwm.Lock()
    fmt.Println("(写操作) -> Groutine",i,"即将写入操作")
    randnum := rand.Intn(100)+1
    n = randnum
    fmt.Println("(写操作):",i,"已经结束操作,写入了:",randnum)
    rwm.Unlock()
}
func read(i int) {
    defer wg.Done()
    rwm.RLock()
    fmt.Println("\t【读操作】-> Groutine",i,"即将读取数据!!!")
    v := n
    fmt.Println("\t【读到的值】:",v)
    rwm.RUnlock()
}
输出的值:
    【读操作】-> Groutine 5 即将读取数据!!!
    【读到的值】: 0
    【读操作】-> Groutine 3 即将读取数据!!!
    【读到的值】: 0
    【读操作】-> Groutine 4 即将读取数据!!!
    【读到的值】: 0
    【读操作】-> Groutine 2 即将读取数据!!!
    【读到的值】: 0
(写操作) -> Groutine 5 即将写入操作
(写操作): 5 已经结束操作,写入了: 97
    【读操作】-> Groutine 1 即将读取数据!!!
    【读到的值】: 97
(写操作) -> Groutine 2 即将写入操作
(写操作): 2 已经结束操作,写入了: 97
(写操作) -> Groutine 4 即将写入操作
(写操作): 4 已经结束操作,写入了: 52
(写操作) -> Groutine 1 即将写入操作
(写操作): 1 已经结束操作,写入了: 21
(写操作) -> Groutine 3 即将写入操作
(写操作): 3 已经结束操作,写入了: 83
  • 练习读写锁+结构体结合
//将锁和map封装成结构体使用
type MyMap struct {
    map2 map[string] string
    rwm sync.RWMutex
}
//将键值对存入到map中
func (m *MyMap) Put(key,value string){
    defer m.rwm.Unlock()
    m.rwm.Lock()
    m.map2[key] = value
}
func (m *MyMap) Get(key string) string{
    defer m.rwm.RUnlock()
    m.rwm.RLock()
    return m.map2[key]
}

func main()  {
    var wg sync.WaitGroup
    //不支持并发
    wg.Add(10)
    m1 := make(map[string] string)
    var rwm sync.RWMutex
    m2 := MyMap{m1,rwm}
    for i:=1;i<=10;i++ {
        go func(i int) {
            m2.Put(fmt.Sprint("key->",i),fmt.Sprint("data => ",i+rand.Intn(1000)))
            wg.Done()
        }(i)
    }
    wg.Wait()
    fmt.Println("开始读取数据.....")
    for i:=1;i<=10;i++ {
        fmt.Println("第",i,"波 : ",m2.Get(fmt.Sprint("key->",i)))
    }
}
输出值:
开始读取数据.....
第 1 波 :  data => 848
第 2 波 :  data => 61
第 3 波 :  data => 84
第 4 波 :  data => 322
第 5 波 :  data => 892
第 6 波 :  data => 546
第 7 波 :  data => 432
第 8 波 :  data => 464
第 9 波 :  data => 309
第 10 波 :  data => 91

条件变量

条件变量:作用并不保证在同意时刻仅仅有一个协程访问某个共享的数据资源,而是在对应的共享数据的状态发生变化时候,通知阻塞在某个条件上的协程,条件变量不是锁,在并发中不能达到同步的目的,因此条件变量总是和锁一起使用的

type Cond struct {
   noCopy noCopy
   // L is held while observing or changing the condition
   L Locker
   notify  notifyList
   checker copyChecker
}

  • (1) func (c *Cond) Wait()
该函数的作用可归纳为如下三点:
a)  阻塞等待条件变量满足  
b)  释放已掌握的互斥锁相当于cond.L.Unlock()。 注意:两步为一个原子操作。
c)  当被唤醒,Wait()函数返回时,解除阻塞并重新获取互斥锁。相当于cond.L.Lock()
  • (2) func (c *Cond) Signal()

单发通知,给一个正等待(阻塞)在该条件变量上的goroutine(线程)发送通知。

  • (3) func (c *Cond) Broadcast()

广播通知,给正在等待(阻塞)在该条件变量上的所有goroutine(线程)发送通知。

func main() {
    /*
        条件变量:sync/cond,多个goroutine等待或接受通知的结合地
        L - > Lock接口 -> Cond条件变量总是和锁结合使用的
            三个指针方法:
                wait():等待goroutine等待接收通知,single(),Broadcast() -> 解除阻塞
                    阻塞方式:读键盘、waitgroup(Add(),Done())、chan(读写)、cond wait()
                single():发送通知
                Broadcast():广播,发送给所有人
    练习:
        main() Goroutine -> condition = false -> wait() ......
        g1 Goroutine -> condition = ture -> 广播/通知 -> 解锁
     */
     var mutex sync.Mutex
     condition := false
     cond := sync.Cond{L:&mutex}
     cond.L.Lock()  //上锁
     fmt.Println("main() -> 上锁")
     go func() {
        time.Sleep(1*time.Second)
        cond.L.Lock()
        fmt.Println("")
        fmt.Println("\t当前Goroutine已经锁定!\n","子Goroutine更改Condition,并通知主Goroutine")
        condition = true    //更改值
        cond.Signal()   //发通知给一个Goroutine
        cond.L.Unlock()
        fmt.Println("\t发送通知并解锁!!")
     }()

     if !condition {
        fmt.Println("\tmain() -> bool=false -> 等待被唤醒")
        //wait会尝试解锁等待 -> 当钱的Goroutine会进入阻塞的状态 -> 等待被唤醒
        //唤醒后又会锁定
        cond.Wait()
        fmt.Println("\tmian() -> 被唤醒!!!")
     }
     cond.L.Unlock()
     fmt.Println("main() -> 继续")
}
输出的值:
main() -> 上锁
    main() -> bool=false -> 等待被唤醒
    当前Goroutine已经锁定!
    子Goroutine更改Condition,并通知主Goroutine
    发送通知并解锁!!
    mian() -> 被唤醒!!!
main() -> 继续

生产消费者模型

  • (1) main函数中定义quit,其作用是让主协程阻塞。

  • (2) 定义product作为队列,生产者产生数据保存至队列中,最多存储3个数据,消费者从中取出数据模拟消费

  • (3) 条件变量要与锁一起使用,这里定义全局条件变量cond,它有一个属性:L Locker。是一个互斥锁。

  • (4) 开启5个消费者协程,开启3个生产者协程。

  • (5) producer生产者,在该方法中开启互斥锁,保证数据完整性。并且判断队列是否满,如果已满,调用wait()让该goroutine阻塞。当消费者取出数后执行cond.Signal(),会唤醒该goroutine,继续生产数据。

  • (6) consumer消费者,同样开启互斥锁,保证数据完整性。判断队列是否为空,如果为空,调用wait()使得当前goroutine阻塞。当生产者产生数据并添加到队列,执行cond.Signal() 唤醒该goroutine。

var oos sync.Cond
func main() {
    rand.Seed(time.Now().UnixNano())    //设置随机数种子
    quit := make(chan bool)     //创建用来结束通信的channel
    product := make(chan int,5)     //产品区使用的channel
    oos.L = new(sync.Mutex)     //创建互斥锁的变量和条件
    //创建3个生产者
    for i:=0;i<3;i++ {
        go producer(product,i+1)
    }
    //创建5个消费者
    for i:=0;i<5;i++{
        go consumer(product,i+1)
    }
    <-quit
}
//生产者
func producer(a chan int, id int) {
    for {
        oos.L.Lock()    //开启使用互斥锁
        for len(a) == 5 {   //判断是否生产长度满足3个队列
            oos.Wait()  //等待被消费完成
        }
        num := rand.Intn(1000)
        a <-  num
        fmt.Printf("当前生产者:%d,产生的数据:%d,当前的管道剩余:%d\n",id,num,len(a))
        oos.L.Unlock()  //生产数据结束,解锁互斥锁
        oos.Signal()    //唤醒被阻塞的消费者
        time.Sleep(1000*time.Millisecond)   //执行完成休息片刻让其他的可执行
    }
}
//消费者
func consumer(b chan int,id1 int) {
    for {
        oos.L.Lock()    //上锁互斥锁
        for  len(b) == 0  {
            oos.Wait()
        }
        num := <- b
        fmt.Printf("\t当前的消费者:%d,当前消费的数据:%d,通道剩余数据:%d\n",id1,num,len(b))
        oos.L.Unlock()
        oos.Signal()
        time.Sleep(time.Millisecond*500)
    }
}
调试输出的值:
当前生产者:1,产生的数据:895,当前的管道剩余:1
    当前的消费者:5,当前消费的数据:895,通道剩余数据:0
当前生产者:2,产生的数据:675,当前的管道剩余:1
    当前的消费者:3,当前消费的数据:675,通道剩余数据:0
当前生产者:3,产生的数据:8,当前的管道剩余:1
    当前的消费者:4,当前消费的数据:8,通道剩余数据:0
当前生产者:1,产生的数据:584,当前的管道剩余:1
    当前的消费者:1,当前消费的数据:584,通道剩余数据:0
当前生产者:2,产生的数据:220,当前的管道剩余:1
当前生产者:3,产生的数据:102,当前的管道剩余:2

蹒跚学Go第七天-并发编程》有9个想法

  1. I do trust all of the ideas you’ve presented in your post. They’re very convincing and can certainly work. Nonetheless, the posts are very short for beginners. May just you please lengthen them a bit from next time? Thanks for the post. kdgdecacdaed

  2. In fact when someone doesnt understand then its up to other visitors that they will help, so here it takes place. efeckdbgfedk

  3. Helpful info. Fortunate me I found your website by chance, and I’m shocked why this twist of fate did not took place earlier! I bookmarked it. fafafbgdgbec

  4. obviously like your website however you need to check the spelling on several of your posts. Several of them are rife with spelling issues and I to find it very bothersome to tell the truth on the other hand I will definitely come again again. bkedeefdebae

  5. Pretty portion of content. I just stumbled upon your web site and in accession capital to assert that I acquire actually enjoyed account your blog posts. Any way I will be subscribing in your augment or even I fulfillment you access constantly quickly. ackecddgeeec

  6. Good post. I study something more challenging on completely different blogs everyday. It is going to all the time be stimulating to learn content material from different writers and practice somewhat one thing from their store. I dgcakkkafdebbkge

发表评论

电子邮件地址不会被公开。 必填项已用*标注