并发编程

并发编程1 goroutine 1.1 goroutine 的调度模型1.1.1 MPG 模式基本介绍1.1.2 MPG 模式运行的状态 11.13 MPG 模式运行的状态 21.2 goroutine 基本介绍1.2.1 进程和线程的说明:1.2.2 并发和并行1.2.3 Go 协程和 Go 主线程1.3 goroutine 快速入门1.3.1 主线程和协程执行的示意图1.3.2 例程间通讯1.4 匿名函数开启 goroutine1.4.1 在匿名函数中使用 sync.WaitGroup1.4.1.1 方法一:直接调用1.4.1.2 方法二:传递 sync.WaitGroup 到匿名函数中1.4.4 例程中的闭包陷阱1.5 互斥锁&原子操作(同步)1.5.1 共享数据互斥锁机制1.5.1.1 不加锁范例1.5.1.2 互斥锁范例1.5.1.3 全局函数实现互斥锁1.5.2 原子操作(了解)1.6管道(异步)1.6.1 channel 的介绍1.6.1.1 无缓冲管道和带缓冲管道区别1.6.2 channel (管道) – 基本使用1.6.2.1 channel 初始化:1.6.3 无缓冲管道的定义和读取、写入1.6.3.1 无缓冲区管道读取阻塞案例1.6.4 关闭管道1.6.4.1 关闭管道写入操作1.6.4.2 关闭管道读取操作1.6.4.3 判断管道是否关闭1.6.5 管道遍历1.6.6 带缓冲区的管道操作1.6.6.1 带缓冲管道结和 goroutine 使用1.6.6.2 通过管道来实现例程间的通讯1.6.7 只读和只写管道1.6.8 select-case 语句 多路复用1.6.8.1 对管道中的值进行操作1.6.8.2 default 语句1.6.8.3 select-case 超时机制1.6.8.4 time.After 实现超时机制1.6.8.5 time.Tick 函数每隔多少秒返回一个管道2 sync 包 runtime 包

并发编程

并发编程开发将一个执行的过程按照并行算法拆分为多个可以独立执行的代码块,从而充分利用多核和多处理器提高系统吞吐率

goroutine(协程) 和 channel(管道)

goroutine 可以做并发和并行的处理,可以将一个任务分解为多个 goroutine 去完成,让多个 cpu 去处理

需求:

要求统计 1-20000 的数字中,那些是素数

分析思路:

  1. 传统方法,使用一个循环,循环判断各个数是不是素数。

  2. 使用并发或者并行的方法,将统计素数的任务分配给多个 goroutine 去完成,这会使用到 goroutine

1 goroutine

1.1 goroutine 的调度模型

1.1.1 MPG 模式基本介绍

image-20210509154007830

  1. M:操作系统的主线程(是物理线程)

  2. P:协程执行需要的上下文(可以理解为协程需要的运行环境)

  3. G:协程

1.1.2 MPG 模式运行的状态 1

image-20210509154424656

  1. 当前程序有个 三个 M ,如果三个 M 都在一个 CPU 上运行,就是并发。如果在不同的 CPU 上运行就是并行

  2. M1,M2,M3 正在执行一个 G(协程),M1 的协程队列有三个,M2 的协程队列有 3 个,M3 协程队列有 2 个

  3. 从上图可以看到:Go 的协程是轻量级的线程,是逻辑态的,Go 可以容易的起上万个协程。

  4. 其他程序 C/JAVA 的多线程,往往是内核态,比教重量级,几千个线程可能耗光 CPU 资源。

1.13 MPG 模式运行的状态 2

image-20210509155037094

  1. 分成两部分来看。

  2. 原来的情况是 M0 主线程正在执行 G0 协程,另外有三个协程在队列等待。

  3. 如果 G0 协程阻塞,比如读取文件或者数据库等

  4. 这时候就会 创建 M1 主线程(也可能是从已有的线程池中取出 M1),并且将原本在 M0 下面等待的 3 个协程挂到 M1 下开始执行,M0 的主线程下 G0 仍然在执行文件 io 的读写。

  5. 这样的 MPG 调度模式,可以既让 G0 执行,同时也不会让队列的其它协程一直阻塞,仍然可以并发 / 并行执行。

  6. 等到 G0 不阻塞了,M0 会被放到空闲的主线程继续执行(从已有的线程池中取),同时 G0 又被唤醒

1.2 goroutine 基本介绍

1.2.1 进程和线程的说明:

  • 进程:资源分配的基本单位

  • 线程:CPU 调度的基本单位

  1. 进程就是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位。

  2. 线程是进程的一个执行实例,是 CPU 调度的基本单位,是程序执行的最小单位,他是比进程更小的能够独立运行的基本单位

  3. 一个进程可以创建和销毁多个线程,同一个进程中的多个线程可以并发执行,而且一个进程退出后,线程也会退出和销毁

  4. 一个程序至少有一个进程,一个进程至少有一个线程或者多个线程

1.2.2 并发和并行

顺序、并发与并行

  • 顺序是指发起执行的程序只能有一个(只占用一个 CPU 或 只占用一个核)

  • 并发是指多个程序同时发起执行(同时处理)的程序可以有多个(单车道并排只能有一辆车,可同时驶入路段多辆车)

  • 并行是指同时执行(同时做)的程序可以有多个 (多车道并排可以有多个车)

并发:

多线程,程序在单核上运行,就是并发(并发的特点是,多个任务作用在一个 cpu 上,从微观的角度上看,在一个时间点上其实只有一个任务在执行,但是 cpu 的时间片很短,所以给用户的感觉就是多个任务在进行)。

因为是在一个 cpu 上,比如有 10 个线程,每个线程执行 10 毫秒(进行轮询操作),从用户的角度看,好像 10 个线程都在运行,但是从微观上看,在某一个时间点看,其实只有一个线程在执行,这就是并发

并行:

在 go 语言中就支持并行执行。

多线程,程序在多核上运行,就是并行(并行,就是将多个任务分配到多个 cpu 上去同时执行,从微观的角度上看,在同一个时间点有多个任务在同时执行。

因为是在多个 CPU 上(比如有 10 个 CPU),比如有 10 个线程,每个线程执行 10 毫秒(各自在不同的 CPU 上执行),从人的角度上看,这 10 个线程都在运行,但是从微观上看,在某一个时间点看,也同时有 10 个线程在执行,这就是并行

总结:

并行的效率一定比并发快。

1.2.3 Go 协程和 Go 主线程

Go 语言中每个并发执行的单元叫 Goroutine,使用 go 关键字后接函数调用来创建一个 Goroutine

函数调用和函数不是一回事

go 的例程是一个用户态的实现

  1. GO 主线程(有程序员直接称为线程 / 也可以理解为 进程一个 Go 线程上,可以起多个协程,可以理解为,协程就是轻量级的线程[编译器做了优化]。

  1. Go 的协程特点:

    有独立的栈空间

    共享程序堆空间

    调度由程序员控制

    协程是轻量化的线程

1.3 goroutine 快速入门

在下面演示范例中,我们有两个函数,分别是TaskATaskB ,我们要实现并行操作,也就是同时运行这两个函数。

package main

import (
    "fmt"
    "time"
)

func TaskA() {
    for i := 1; i <= 10; i++ {
        fmt.Println(i)
    }
}

func TaskB() {
    for i := 'A'; i <= 'Z'; i++ {
        fmt.Println(string(i))
    }
}

func main() {
    fmt.Println("start")

    // go 关键字开启并发执行的例程,这是一个特定语法
    go TaskB()
    go TaskA()

    time.Sleep(time.Second * 2)
    fmt.Println("END")
}

执行:

从下图中我们可以看到 TaskB() 函数中输出的字母与 TaskA() 中输出的数字并发运行了,从而实现了 go 的并发编程

image-20210702221438886

我们从这个示例上可以看到 A 和 B 两个函数不知道谁先执行的运行关系了。已经不能预知,因为这个中间涉及到一个调度问题,当然我们也可以通过人为的方式来实现他们的调度关系,有两种方式可以实现自定义执行顺序

我们在写代码的时候不会主动去实现调度关系,只是在演示的时候才需要这么做,这样更好的方便我们来理解 goroutine

方式一如下范例:

package main

import (
    "fmt"
    "time"
)

func TaskA() {
    for i := 1; i <= 10; i++ {
        fmt.Println(i)
        
        // 每次执行暂停一秒
        time.Sleep(1 * time.Second)
    }

}

func TaskB() {
    for i := 'A'; i <= 'Z'; i++ {
        fmt.Println(string(i))

        // 每次执行暂停一秒
        time.Sleep(1 * time.Second)
    }
}

func main() {
    fmt.Println("start")
    // go 关键字
    go TaskB()
    go TaskA()

    time.Sleep(time.Second * 10)
    fmt.Println("END")
}

输出

通过输出我们可以看到他每次打印都是在间隔一秒,释放 cpu 调度任务,从而执行另一个任务

image-20210702230011244

方式二如下范例:

通过 runtime 包实习

可以通过 runtime 包中的 GoSched 让例程主动让出 CPU,也可以通过 time.Sleep 让例程休眠从而让出 CPU

package main

import (
    "fmt"
    "runtime"
    "time"
)

func TaskA() {
    for i := 1; i <= 10; i++ {
        fmt.Println(i)

        // runtime.Gosched() 该包是主动让出调度,让其他任务执行
        runtime.Gosched()
    }

}

func TaskB() {
    for i := 'A'; i <= 'Z'; i++ {
        fmt.Println(string(i))
        
        // runtime.Gosched() 该包是主动让出调度,让其他任务执行
        runtime.Gosched()
    }
}

func main() {
    fmt.Println("start")
    // go 关键字
    go TaskB()
    go TaskA()

    time.Sleep(time.Second * 3)
    fmt.Println("END")
}

输出我们会发现他在每次调度的时候都会主动让出调度操作,从而执行另外一个任务。

image-20210702230417182

通过上面的几个案例可以看到现在 A 和 B 任务已经在并发执行了

1.3.1 主线程和协程执行的示意图

image-20210509145950535

  • 程序一旦开始,我们会认为一个主线程或一个协程开始执行了

  • 然后继续执行会发现有一个 go test() 函数,开启协程

    • 如果主线程退出了,则协程即使没有执行完毕,也会退出

    • 当然协程也可以再主线程没有退出前就自己结束了,比如完成了自己的任务

  • 主线程依旧执行 for 循环

  • 等于说需要一定时间给 go 协程分配 cpu

  • 主线程退出,整个程序退出,协程也会退出

如下范例

package main

import (
    "fmt"
)

func TaskA() {
    for i := 1; i <= 10; i++ {
        fmt.Println(i)
    }

}

func TaskB() {
    for i := 'A'; i <= 'Z'; i++ {
        fmt.Println(string(i))
    }
}

// main 就是一个主线程
func main() {
    fmt.Println("start")

    // go 关键字开启协程 
    go TaskB()
    go TaskA()
    fmt.Println("END")
}

输出

[23:09:02 root@go codes]#go run goroutine.go 
start
END

# 我们会发现当我们的 main 主线程执行完了之后就会退出程序,导致 TaskB()、TaskA() 两个协程没有执行
# 验证了主线程退出,整个程序退出,协程也会退出

由此引主线程与协程之间的通信:

1.3.2 例程间通讯

我们很多时候无法预期我们的 工作例程的执行时间。

main 函数也是由一个例程来启动执行,这个例程称为主例程,其他例程叫工作例程。主例程结束后工作例程也会随之销毁,使用 sync.WaitGroup (计数信号量)来维护执行例程执行状态

type WaitGroup struct 是一个结构体

原理:

计数信号量:
启动例程之前,计数信号量添加 +1
当例程执行结束时,计数信号量 -1
当计数信号量为 0 时就表示例程已经执行结束了

查看 waitgroup 结构体的方法

[10:28:05 root@go codes]#go doc sync.waitgroup
package sync // import "sync"

type WaitGroup struct {
        // Has unexported fields.
}
    A WaitGroup waits for a collection of goroutines to finish. The main
    goroutine calls Add to set the number of goroutines to wait for. Then each
    of the goroutines runs and calls Done when finished. At the same time, Wait
    can be used to block until all goroutines have finished.

    A WaitGroup must not be copied after first use.

func (wg *WaitGroup) Add(delta int)     # 添加计数器
func (wg *WaitGroup) Done()             # 减一计数器
func (wg *WaitGroup) Wait()             # 等待,判断计数信号量值是否为 0 ,为 0 时表示工作例程执行完毕然后在执行主例程

范例:

package main

import (
    "fmt"
    "sync"
)

// 由于 sync.WaitGroup 是一个结构体只拷贝类型,我要对同一块内存地址操作所以需要传入指针
func TaskA(wg *sync.WaitGroup) {
    // 当 TaskA 工作例程执行完毕之后信号量减一,使用 defer 是为了防止程序出错后退出执行 wg.Done()
    defer wg.Done()

    for i := 1; i <= 10; i++ {
        fmt.Println(i)
    }

}

func TaskB(wg *sync.WaitGroup) {
    // 当 TaskB 工作例程执行完毕之后信号量减一
    defer wg.Done()
    for i := 'A'; i <= 'Z'; i++ {
        fmt.Println(string(i))
    }

}

func main() {
    //  计数信号量:
    // 启动例程之前,计数信号量添加 +1
    // 当例程执行结束时,计数信号量 -1
    // 当计数信号量为 0 时就表示例程已经执行结束了

    fmt.Println("start")

    // 调用 sync.WaitGroup{} 结构体,用的是 new 函数取地址操作
    wg := new(sync.WaitGroup)

    // 当前有两个工作例程,所以添加 2
    wg.Add(2)

    // go 关键字开启工作例程
    go TaskB(wg)
    go TaskA(wg)

    // 在 END 之前等待,等待计数信号量值是否为 0,为 0 时表示工作例程执行完毕
    wg.Wait()
    fmt.Println("END")
}

执行,通过执行我们会发现这种方法就已经实现了例程间的通讯

[11:03:20 root@go codes]#go run goroutine.go 
start
1
2
3
4
5
6
...
END

1.4 匿名函数开启 goroutine

在调用 匿名函数的时候,一定要记住 go 关键字后面跟的是函数调用

image-20210703112936783

package main

import (
    "fmt"
    "time"
)

func main() {
    fmt.Println("start")

    go func() {
        for i := 0; i <= 10; i++ {
            fmt.Println(i)
        }
    }()

    go func() {
        for i := 'A'; i <= 'Z'; i++ {
            fmt.Printf("%c\n", i)
        }
    }()

    time.Sleep(time.Microsecond * 10)
    fmt.Println("end")
}

执行

[11:27:59 root@go codes]#go run goroutine.go 
START
A
B
C
D
E
F
...
END

1.4.1 在匿名函数中使用 sync.WaitGroup

在匿名函数中调用 sync.WaitGroup 有两种方法

1.4.1.1 方法一:直接调用

注意:由于这两个匿名函数的作用域都是在 main() 主函数里面的,所以能够直接调用

因为一个变量在当前作用域下没有找到它会向父作用域去找

而且这个 wg.Done() 使用的是父作用域里面的的 sync.WaitGroup{} 没有重新赋值也就不会在内存中出现赋值拷贝的过程,都是在对同一个 sync.WaitGroup{} 进行操作,所以能够使用值类型的结构体

但是尽量使用的方式比较好。

package main

import (
    "fmt"
    "sync"
)

func main() {
    fmt.Println("start")
    
    // 定义 sync.WaitGroup 一个 wg 的结构体
    wg := sync.WaitGroup{}
    
    // 添加两个计数信号量,因为有两个匿名函数
    wg.Add(2)
    go func() {
        for i := 0; i <= 10; i++ {
            fmt.Println(i)
        }
        
        // 调用 wg.Done() 执行完毕之后就减掉一个计数信号量,调用父作用域中的 wg.Done() 方法
        // 而且这个 wg.Done() 使用的是父作用域里面的的  sync.WaitGroup{} 没有重新赋值也就不会在内存中出现赋值拷贝的过程,都是在对同一个 sync.WaitGroup{} 进行操作,所以能够使用值类型的结构体
        wg.Done()
    }()

    go func() {
        for i := 'A'; i <= 'Z'; i++ {
            fmt.Printf("%c\n", i)
        }
        
         // 调用 wg.Done() 执行完毕之后就减掉一个计数信号量,调用父作用域中的 wg.Done() 方法
        wg.Done()
    }()

    wg.Wait()
    fmt.Println("end")
}

执行

[11:27:59 root@go codes]#go run goroutine.go 
START
A
B
C
D
E
F
...
END

1.4.1.2 方法二:传递 sync.WaitGroup 到匿名函数中

当然也可以将 sync.WaitGroup 直接传递到匿名结构体中,由于需要重新赋值会在内存中出现赋值拷贝的过程,所以直接传递的时候需要传递指针类型

因为他们是在不同的作用域下

package main

import (
    "fmt"
    "sync"
)

func main() {
    fmt.Println("start")
    wg := new(sync.WaitGroup)
    wg.Add(2)

    // 如果在匿名函数进行传递操作就需要传递 *sync.WaitGroup 指针类型
    // 因为重新赋值会在内存中出现赋值拷贝的过程
    go func(wg *sync.WaitGroup) {
        for i := 0; i <= 10; i++ {
            fmt.Println(i)
        }
        wg.Done()
    }(wg)

    go func(wg *sync.WaitGroup) {
        for i := 'A'; i <= 'Z'; i++ {
            fmt.Printf("%c\n", i)
        }
        wg.Done()
    }(wg)

    wg.Wait()
    fmt.Println("end")
}

执行

[11:27:59 root@go codes]#go run goroutine.go 
START
A
B
C
D
E
F
...
END

1.4.4 例程中的闭包陷阱

因为闭包使用函数外变量,当例程执行是,外部变量已经发生变化,导致打印内容不正确,可使用在创建例程时通过函数传递参数(值拷贝)方式避免

image-20210703151457829

package main

import (
    "fmt"
    "sync"
)

func main() {
    wg := sync.WaitGroup{}

    // 使用 for 循环启动 3 个例程
    fmt.Println("start")
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func() {
            fmt.Println(i)
            wg.Done()
        }()
    }
    wg.Wait()
    fmt.Println("end")
}

// 主例程执行完了,工作例程才执行,这时候当主例程 3 次循环完了之后 i 已经变为了 3
// 然后再执行 3 个工作例程, 这是 i 已经变为了 3 所以输出的就是 3 

执行

[12:05:51 root@go codes]#go run closer.go 
start
3
3
3
end

当然我们可以通过值传递,将 i 传递到内部函数中成为自己的局部变量

package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}

	// 使用 for 循环启动 3 个例程
	fmt.Println("start")
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func(i int) {		// 将 i 传到到匿名函数中
			fmt.Println(i)
			wg.Done()
		}(i)					// 将 i 传到到匿名函数中
	}
	wg.Wait()
	fmt.Println("end")
}

执行

[15:17:33 root@go codes]#go run closer.go 
start
2
0
1
end

# 执行就能够看到正常输出 0 1 2

1.5 互斥锁&原子操作(同步)

1.5.1 共享数据互斥锁机制

共享数据(同步)

多个并发程序需要对同一个资源进行访问,则需要先申请资源的访问权限,同时再使用完成后释放资源的访问权。当资源被其他程序已申请访问权后,程序应该等待访问权被释放并被申请到时进行访问操作。同一时间资源只能被一个程序访问和操作

image-20210509223117926

对全局变量加锁,当 协程1 对该 map 进行操作的时候就,对 map 空间先加上一把锁,然后 协程1 执行完了之后就解锁。

如果协程一还在操作这把锁,这时候 协程2 来看目前这把锁是不是加锁状态,如果是加锁状态 协程2 就去队列缓冲,这个队列其实就是一个数据结构。

这时候协程三也来了,但是还是加锁状态,协程三也就老老实实的跑去队列中

依此类推

如果 协程1 操作完了之后就会将这把锁解开,然后底层有一种机制,会从这个队列里面取出排在最前面的协程去工作,这样就好比商场排队结账。

1.5.1.1 不加锁范例

引出锁机制案例:

我们看这个示例,开启 20 个工作例程,其中 10 个例程分别给 salary += 10 加 1000 次,另外 10 个例程分别给 salary -= 10 减 1000 次。最后的结果 salary = 0

package main

import (
    "fmt"
    "runtime"
    "sync"
)

func main() {
    fmt.Println("start")
    // sync.WaitGroup 计数信号量赋值给 wg
    wg := sync.WaitGroup{}
    salary := 0

    // 循环 10 ,里面分别有两个工作例程,也就是开启了 20 个工作例程
    for i := 0; i < 10; i++ {
        
        // 这个 for 里面有两个工作例程,所以计数信号量为 2
        wg.Add(2)
        
        // 第一个工作例程 salary += 10 加 1000 次
        go func() {
            // 计数信号量减一
            defer wg.Done()
            for i := 0; i < 1000; i++ {
                salary += 10
                runtime.Gosched()   // GoSched 让例程主动让出 CPU
            }
        }()

        // 第二个工作例程 salary -= 10 减 1000 次
        go func() {
             // 计数信号量减一
            defer wg.Done()
            for i := 0; i < 1000; i++ {
                salary -= 10
                runtime.Gosched()   // GoSched 让例程主动让出 CPU
            }
        }()
    }

    wg.Wait()
    fmt.Println("end", salary)
}

执行

但是会发现每次 salary 输出的结果都不一样,和我们的预期结果是不一样的。

image-20210703162613040

因为如下图,在计算机中, CPU 会把数据从内存拿到寄存器进行计算,但是计算机每次的执行顺序都是不同的,并不是说它会将第一个 for 循环中的 salary += 10 之后再执行下一个 for 循环中的数,而是有可能第一步执行把 salary 从 0 的时候放入到 寄存器中,第二把直接执行 salary -= 10 的操作,因为并发是多个协程之间同时执行

image-20210703163046741

1.5.1.2 互斥锁范例

由此引出了互斥锁制,也就是当有 协程1 在对共享数据进行操作的时候,其他的协程就别操作。当 协程1 执行完毕之后其他的协程在进行操作

package main

import (
    "fmt"
    "runtime"
    "sync"
)

func main() {
    salary := 0
    wg := sync.WaitGroup{}

    // 定义互斥锁,使用 sync.Mutex 结构体
    var lock sync.Mutex

    fmt.Println("start")
    for i := 0; i < 1; i++ {
        wg.Add(2)
        go func() {
            defer wg.Done()
            for i := 0; i < 1000; i++ {
                lock.Lock()   // 第一个协程操作 salary 共享数据时加锁
                salary += 10  // 对 salary 共享数据进行操作
                lock.Unlock() // 操作 salary 共享数据完毕后解锁其他协程可以对共享数据进行操作
                runtime.Gosched()

            }
        }()

        go func() {
            defer wg.Done()
            for i := 0; i < 1000; i++ {
                lock.Lock()   // 第二个协程操作 salary 共享数据时加锁
                salary -= 10  // 对 salary 共享数据进行操作
                lock.Unlock() // 操作 salary 共享数据完毕后解锁
                runtime.Gosched()   

            }
        }()
    }
    wg.Wait()
    fmt.Println("end", salary)
}

执行发现不管我们运行多少变 salary 最后的值都是 0 ,这就是通过互斥锁的机制把对 salary 共享数据的操作,让不同工作例程之间同时只有一个工作例程对 salary 进行操作。

image-20210703165329135

注意:

一般在进行加锁解锁的时候,都是把锁指定到共享数据的位置,也就是把锁控制在很小的一个范围内,这也是在做互斥锁的时候防止死锁的重要的一个方式,只在关键的步骤加锁。如果不是内部匿名函数的话,在其他函数执行互斥锁操作时,也需要把锁传入到其他函数中。

1.5.1.3 全局函数实现互斥锁

由于需要在全局函数实现互斥锁所以需要传入我们的 sync.WaitGroupsync.Mutex 两个结构体,但是结构体和函数是值类型,所以需要传入这两个结构体的指针类型,才能实现对同一个互斥锁和同一个共享空间的操作

package main

import (
    "fmt"
    "sync"
)

// 传入 *sync.WaitGroup,  *sync.Mutex, salary 的指针类型,实现对同一个互斥锁和同一个共享空间操作
func TaskA(wg *sync.WaitGroup, lock *sync.Mutex, salary *int) *int {
    defer wg.Done()
    for i := 0; i < 1000; i++ {
        
        // 加锁
        lock.Lock()
        *salary += i
        
        // 解锁
        lock.Unlock()
    }
    return salary
}

func TaskB(wg *sync.WaitGroup, lock *sync.Mutex, salary *int) *int {
    defer wg.Done()
    for i := 0; i < 1000; i++ {
        lock.Lock()
        *salary -= i
        lock.Unlock()
    }
    return salary
}

func main() {
    salary := 0
    wg := sync.WaitGroup{}
    // 定义互斥锁,使用 sync.Mutex 结构体
    var lock sync.Mutex

    fmt.Println("start")
    
    // 循环 10 执行两个协程,也就是开启了 20 个工作例程
    for i := 0; i < 10; i++ {
        wg.Add(2)
        go TaskA(&wg, &lock, &salary)
        go TaskB(&wg, &lock, &salary)
    }
    wg.Wait()
    fmt.Println("end", salary)
}

多次输出我们的 salary 结果都为 0

image-20210703175716577

1.5.2 原子操作(了解)

其实我们互斥锁的底层就是一个原子操作的过程

原子操作是指程序执行过程不能中断的操作 s,go 语言 sync/atomic 包中提供提供了五类原子操作函数,其操作对象为整数型或整数指针

  • Add:增加/减少

  • Load:载入

  • Store:存储

  • Swap:两个变量之间交换

  • Swap:更新

  • CompareAndSwap:比较第一个参数引用指是否与第二个参数值相同,若相同则将第一

    个参数值更新为第三个参数

通过原子操作不用 lock 互斥锁的方式实现对共享数据的同步操作

package main

import (
    "fmt"
    "runtime"
    "sync"
    "sync/atomic"
)

func main() {
    var salary int64 = 0
    wg := sync.WaitGroup{}

    fmt.Println("start")
    for i := 0; i < 10; i++ {
        wg.Add(2)
        go func() {
            defer wg.Done()
            for i := 0; i < 1000; i++ {

                // 使用原子锁,当有协程在操作的时候其他协程不能够对 salary 共享数据进行操作
                // 共享数据必须传入指针
                atomic.AddInt64(&salary, +10)
                runtime.Gosched()
            }
        }()
        go func() {
            defer wg.Done()
            for i := 0; i < 1000; i++ {
                atomic.AddInt64(&salary, -10)
                runtime.Gosched()
            }
        }()
    }

    wg.Wait()
    fmt.Println("end", salary)
}

不管怎么运行他的结果都是不变的

image-20210703182259525

1.6管道(异步)

数据处理者处理完数据后将数据放入缓冲区中,数据接收者从缓冲区中获取数据,处理者不用等待接收者是否准备好处理数据

为什么需要 channel:

前面使用全局变量加锁同步来解决 goroutine 得通讯,但是不够完美。

  1. 主线程在等待所有 goroutine 全部完成的时间很难确定。

  2. 如果主线程休眠时间长了,会加长等待时间,如果等待时间短了,可能还有 goroutine 处于在工作状态,这时会随着主线程的退出而销毁。

  3. 通过全局变量加锁同步来实现通讯,也并不利用多个协程对全局变量的读写操作。

  4. 上面种种分析都在呼唤一个新的通讯机制-channel

1.6.1 channel 的介绍

  1. 管道的数据是先进先出【FIFO:first in first out】

  2. 线程安全,也就是都在这个管道的时候,不管有多少个协程在对同一个管道有多少个操作。直接用绝对不出问题,这一点是由编译器在底层维护的。当多个 goroutine 访问时,不需要加锁,也就是说 channel 本身就是线程安全的。

    channel 时线程安全,多个协程操作同一个管道时,不会发生资源竞争问题

  3. channel 时有类型的,一个 string 的 channel 只能够存放 string 类型数据。如果是 int 就只能发 int,当我们需要放多个类型的时候只需要将 channel 声明为 interface{} 即可,一般来讲没必要将其设置为空接口类型,因为在取的时候还要通过类型断言来取。

1.6.1.1 无缓冲管道和带缓冲管道区别

无缓冲的管道保证进行发送和接收的 goroutine 会在同一时间进行数据交换,而有缓冲的管道没有这种保证。

在无缓冲管道的基础上,为管道增加一个有限大小的存储空间形成带缓冲管道。带缓冲通道在发送时无需等待接收方接收即可完成发送过程,并且不会发生阻塞,只有当存储空间满时才会发生阻塞。同理,如果缓冲管道中有数据,接收时将不会发生阻塞,直到管道中没有数据可读时,管道将会再度阻塞。

无缓冲管道保证收发过程同步。而无缓冲是异步的收发过程,因此效率可以有明显的提升。

1.6.2 channel (管道) – 基本使用

定义/声明 channel:

var 变量名 chan 数据类型

// var   :后面定义的是管道名。
// chan  :是关键字
// 数据类型:表示这个管道属于什么样的数据类型

初始化的三种方式举例:

// 1. 通过 var 声明管道
    var intChan chan int (intChan 用于存放 int 数据)
    var mapChan chan map[int]string (mapChan 用于存放 map[int]string 类型)
    var perChan chan Person 也能够放入 person 结构体
    var perChan2 chan *Person 还可以放结构体的指针
// 初始化
    intChan = make(chan int)
    mapChan = make(chan map)
    perChan = make(chan Person)
    perChan2 = make(chan *Person)

// 2.通过声明的同时并初始化
    var intChan chan int = make(chan int)
    var mapChan chan map[int]string = make(chan map[int]string)
    var perChan chan person = make(chan person)
    var perChan2 chan *person = make(chan *person)

// 3.短声明的同时并初始化
    intChan := make(chan int)
    mapChan := make(chan map[int]string)
    perChan := make(chan person)
    perChan2 := make(chan *person)

说明:

  1. channel 是引用类型

  2. channel 必须初始化才能写入数据,即 make 后才能使用

1.6.2.1 channel 初始化:

使用 make 函数初始化,make(chan type)/make(chan type, len),不带 len 参数的用于创建无缓存区的管道,使用 len 创建指定缓冲区长度的管道

说明:使用 make 进行初始化

var intChan chan int
intChan = make(chan int,10)

/* 说明:
    1. var intChan :声明了 chan 名称
    2. chan int    :指定该 chan 为 int 类型
    3. intChan = make(chan int, 10) 是进行初始化,使用 make 并且类型为 chan int,容量为 10
*/

范例代码:

package main

import (
    "fmt"
)

// 演示管道的使用
func main() {
    // 1.创建一个可以存放三个 int 类型的管道
    var intChan chan int
    intChan = make(chan int, 3)

    // 2.看看 intChan 是什么
    fmt.Printf("intchan 的值 = %v\nintchan 本身也有地址 = %p", intChan, &intChan)
}

输出:

通过输出,我们发现他的值是一个地址

image-20210513204943659

可以将其理解为:

intChan 指向了一个地址:0xc0000de080 ,而这个地址 0xc0000de080又指向了真正的管道。

在我们的内存里面,intChan 本身也有地址为:0xc0000d8018

由此我们可以理解,为什么我们在传一个 channel 的时候,传入到一个函数里面去其实是操作的同一个channel,因为 channel 是引用类型。

1.6.3 无缓冲管道的定义和读取、写入

无缓冲的管道是指在接收前没有能力保存任何值的通道。这种类型的管道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。

如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。

无缓冲区管道必须要有一个工作例程去读它不然无法写入数据到管道中,否则会出现死锁

管道是声明需要指定管道存放数据的类型,管道原则可以存放任何类型,但只建议用于存放值类型或者只包含值类型的结构体。在管道声明后,会被初始化为 nil

可通过操作符<-和->对管道进行读取和写入操作,当写入无缓冲区管道或由缓冲区管道已满时写入则会阻塞直到管道中元素被其他例程读取。同理,当管道中无元素时读取时也会阻塞到管道被其他例程写入元素

package main

import "fmt"

type person struct {
}

func main() {
    // 声明一个 int 类型的管道 channel
    // 初始化和赋值需要使用 make
    // make()
    // 这种叫做无缓冲区管道
    channel := make(chan int) 

    // go 启动一个工作例程进行写入操作
    go func() {
        // 操作,读、写 只能够往管道中写入定义时候的类型数据
        // 写操作
        // 将 1 写入 channel 管道中
        channel <- 1
    }()

    // 读操作
    // 主例程进行读取操作
    // 将 channel 管道中的数据读给 getchan 赋值过程
    // 如果主例程没有读取到数据就会造成管道阻塞死锁
    getchan := <-channel
    fmt.Println(getchan)
}

执行

[19:44:04 root@go codes]#go run channel.go 
1

1.6.3.1 无缓冲区管道读取阻塞案例

注意:

不带缓冲区管道一定要注意有一个人读取管道数据同时有一个人往管道里写入数据,如果还没有读取该管道就会发生阻塞

如下面案例阻塞 5s

package main

import (
    "fmt"
    "time"
)

type person struct {
}

func main() {
    channel := make(chan int)

    go func() {
        fmt.Println("go stat")
        // 将 1 写入管道
        channel <- 1
        fmt.Println("go 1 end", time.Now())

        // 将 2 写入管道
        channel <- 2
        fmt.Println("2 end", time.Now())
    }()
    fmt.Println("channle begin")
    // 读取 channel 管道中第一个数据也就是 1
    getchan := <-channel
    fmt.Println("channle 1 end")
    fmt.Println(getchan)

    // 读完管道中第一个数据之后,等待 5s 再来读取管道中第二个数据,所以这个时候管道就会阻塞 5s
    time.Sleep(5 * time.Second)

    // 读管道,如果没有变量来接收管道的值,默认该值会被抛弃,类似于函数返回值
    <-channel

    // 这里暂停 1s 是等待 channel 管道中的第二个数据读取完毕
    time.Sleep(1 * time.Second)
}

执行

image-20210703200242580

1.6.4 关闭管道

关闭管道直接使用 close() 内置函数,被关闭以后的管道是不能写入操作的否则报错,但是可以读取操作读取到的数据只是该管道类型的默认类型

可通过 close 函数关闭管道,关闭后的管道不能被写入,当读取到最后一个元素后可通过读取的第二个参数用于判断是否结束

image-20210516155721426

x 就是表示为从 channel 中取得值,ok 表示是否取出来了。

1.6.4.1 关闭管道写入操作

范例:

image-20210703202714416

package main

import (
    "fmt"
)

type person struct {
}

func main() {
    channel := make(chan int)

    go func() {
        // 将 1 写入管道
        channel <- 1
    }()

    // 将 channel 管道中的值读取给 getchan
    getchan := <-channel
    fmt.Println(getchan)

    // 关闭管道这里先注释
    // close(channel)

    // 关闭管道之后在往管道中写入数据,我们会发现不能有写入操作
    // 开启第二个协程
    go func() {
        fmt.Println(<-channel)
    }()
    channel <- 2
}

执行

[20:18:59 root@go codes]#go run channel.go 
1
2

但是如果我们把该管道 close() 注释取消掉,关闭之后他就不能再有写入操作,输出报错

image-20210703204657852

输出

[20:47:00 root@go codes]#go run channel.go 
1
panic: send on closed channel

goroutine 1 [running]:
main.main()
        /data/go/day7/codes/channel.go:29 +0x139
exit status 2
# 报错 恐慌:在封闭频道发送

1.6.4.2 关闭管道读取操作

关闭管道之后可以读取操作,但是读取到的数据则是该管道类型的默认值,如 int 默认值就是 0

package main

import (
    "fmt"
)

type person struct {
}

func main() {
    channel := make(chan int)

    go func() {
        // 将 1 写入管道
        channel <- 1
    }()

    // 将 channel 管道中的值读取给 getchan
    getchan := <-channel
    fmt.Println(getchan)

    // 开启一个工作协程关闭管道
    go func() {
        fmt.Println("关闭管道后读取:")
        close(channel)
    }()
    fmt.Println(<-channel)
}

执行

[20:56:39 root@go codes]#go run channel.go 
1
关闭管道后读取:
0

1.6.4.3 判断管道是否关闭

如果想要判断管道是否关闭,我们可以使用两个变量来接受管道的值

true 为 开启管道,false 为关闭管道

package main

import (
    "fmt"
)

type person struct {
}

func main() {
    channel := make(chan int)

    go func() {
        // 将 1 写入管道
        channel <- 1
    }()

    // 这里用 getchan 接收 channel 管道的值,ok 来接收的管道状态,是否为开启或者关闭
    getchan, ok := <-channel
    fmt.Println(getchan, ok)

    // 开启一个工作协程关闭管道
    go func() {
        fmt.Println("关闭管道后读取:")
        close(channel)
    }()
    
    // 这里用 v 接收 channel 管道的值,ok 来接收的管道状态,是否为开启或者关闭
    v, ok := <-channel
    fmt.Println(v, ok)
}

执行

[21:01:20 root@go codes]#go run channel.go 
1 true                  # 第一次为开启管道返回值为 true
关闭管道后读取:
0 false                 # 接着我们关闭了管道返回值为 false

1.6.5 管道遍历

管道队列由于没有下标的定义,所以取值的时候一定是按照顺序取。

channel 支持 for-range 的方式进行遍历,请注意两个细节

  1. 在遍历的时候,如果 channel 没有关闭,则会出现 deadlock 报错

  2. 在遍历的时候,如果 channel 已经关闭,则会正常遍历数据,遍历完后,就会退出遍历

为什么需要 channel 的遍历,因为当我们放入 100W 个数据 一个一个的取不得累死,所以 channel 是支持遍历的。

而且一定要用 for-range 的方式,因为如果通过 for 的话,每取一次 channel 的长度就会发生变化

注意:

我们在对 管道 进行遍历的时候需要先将管道关闭不然会报 deadlock(死锁) ,因为取完管道里的值之后还在向 管道 中取值

死锁范例:

package main

import "fmt"

type person struct {
}

func main() {

    channel := make(chan int)

    go func() {
        // 将 1,2,3 写入 channel 管道
        channel <- 1
        channel <- 2
        channel <- 3
    }()
    // 管道遍历
    for v := range channel {
        fmt.Println(v)
    }
}

执行

[21:20:43 root@go codes]#go run channel.go 
1
2
3
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
        /data/go/day7/codes/channel.go:20 +0xe5
exit status 2

# 通过执行我们会发现虽然能够将数据从管道中读出,但是由于管道中已经没有人往里面写入数据而且这个时候还有人在读数据就会出现死锁阻塞现象

那我们如何解决这个问题,使用内置函数 close 可以关闭 channel ,当 channel 关闭后,就不能在向 channel 写数据,但是仍然可以使用该 channel 读取数据。从而解决阻塞和死锁现象

close 管道之后遍历:

package main

import "fmt"


func main() {
    // 管道遍历
    channel := make(chan int)

    go func() {
        // 将 1 写入管道
        channel <- 1
        channel <- 2
        channel <- 3
        // 这里关闭管道
        close(channel)
    }()

    for v := range channel {
        fmt.Println(v)
    }
}

执行

[21:25:51 root@go codes]#go run channel.go 
1
2
3

1.6.6 带缓冲区的管道操作

带缓冲的管道是一种在被接收前能存储一个或者多个值的管道。这种类型的管道并不强制要求 goroutine 之间必须同时完成发送和接收。

带缓冲的管道会阻塞发送和接收动作的条件也会不同。只有在管道中没有要接收的值时,接收动作才会阻塞。只有在管道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。

带缓冲区如下图讲解

image-20210513203024330

左边有一个 Go 的主线程,然后右边有多个协程,然后粉红色的地方有个带缓冲区管道(channel),带缓冲区管道就像一个队列一样,里面的数据会排队,带缓冲区管道本身也有大小也有容量。当我们向带缓冲区管道放东西的时候他就会按照顺序进行排列。

然后再读这个管道的时候呢,将第一个读出来,一旦读出一个数据的时候他的长度就变会变少,但是容量还是没有变化。然后后面的数据就会依次往前移动。

如果当管道里面的数据取完了之后,在对管道进行取数据就会报错阻塞。

所以管道的本质就是一个队列。

总结:

带缓冲区管道在进行取数据的时候就是取最前面的,而且数据也会减少。容量不变

范例:

package main

func main() {

    // 定义一个 channel 管道类型为 int,然后容量为 3
    var channel chan int = make(chan int, 3)
    channel <- 1
    channel <- 2
    channel <- 3
}

执行发现并没有报错和阻塞,因为我们向 channel 管道写入了三个数据,没有超过 channel 管道的容量所以不会报错。

image-20210703222819199

但是放入的值超过了管道的长度就会出现阻塞

package main

func main() {
    var channel chan int = make(chan int, 3)
    channel <- 1
    channel <- 2
    channel <- 3
    
    // 这里我在向 channel 放入一个 4 ,就会超过 channel 管道的长度
    channel <- 4
}

报错,阻塞死锁

[22:28:10 root@go codes]#go run bufferchan.go 
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        /data/go/day7/codes/bufferchan.go:8 +0xa7
exit status 2

如果这个时候我把 channel 中的数据读取一个,就不会报错,因为此时的 channel 容量就恢复成了 3 ,而且带缓冲的管道读取是满足先进先出的条件

package main

import "fmt"

func main() {
    var channel chan int = make(chan int, 3)
    channel <- 1
    channel <- 2
    channel <- 3
    // 这里取出一个 channel 管道的值
    fmt.Println(<-channel)
    channel <- 4
}

执行

[22:30:44 root@go codes]#go run bufferchan.go 
1

# 执行会发现能够正常执行,就不会报错

但是当我们读取带缓冲管道的时候,读取的值超过了管道当前存储的值也会报错死锁,因为读取完毕之后当前管道的容量里面并没有多余的值。

package main

import "fmt"

func main() {
    var channel chan int = make(chan int, 3)
    
    // 往 channel 管道写入 4 个值
    channel <- 1
    channel <- 2
    channel <- 3
    channel <- 4

    // 对 channel 管道读取 5 次操作
    fmt.Println(<-channel)
    fmt.Println(<-channel)
    fmt.Println(<-channel)
    fmt.Println(<-channel)
    fmt.Println(<-channel)
}

执行

[22:36:49 root@go codes]#go run bufferchan.go 
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        /data/go/day7/codes/bufferchan.go:10 +0xb9
exit status 2

# 执行报错,死锁阻塞

1.6.6.1 带缓冲管道结和 goroutine 使用

在工作中更多的时管道结合 goroutine 来使用

第一种:读取方法,通过 close() 内函数关闭管道写入数据时进行读取操作

package main

import "fmt"

func main() {
    var channel chan int = make(chan int, 3)

    // 开启一个 goroutine , 里面的 for 循环向 channel 管道中写入 10 个数据
    go func() {
        for i := 0; i < 10; i++ {
            channel <- i
        }

        // 数据写到管道之后即使关闭管道,不然会出现死锁阻塞
        close(channel)
    }()

    // 读取管道中的值
    for v := range channel {
        fmt.Println(v)
    }
}

执行

[22:58:39 root@go codes]#go run bufferchan.go 
0
1
2
3
4
5
6
7
8
9

第二种:读取方法,由于我们已知往管道中放入的值容量个数,所以我在取的时候也取同样的容量个数

package main

import "fmt"

func main() {
    var channel chan int = make(chan int, 3)

    // 开启一个 goroutine , 里面的 for 循环向 channel 管道中写入 10 个数据
    go func() {
        for i := 0; i < 10; i++ {
            channel <- i
        }
    }()

    // 已知放入 channel 管道中的值个数为 10,读取的时候也读取 10 个
    for i := 0; i < 10; i++ {
        fmt.Println(<-channel)
    }

}

执行

[22:58:47 root@go codes]#go run bufferchan.go 
0
1
2
3
4
5
6
7
8
9

1.6.6.2 通过管道来实现例程间的通讯

我们可以通过这种方式实现 sync.WaitGroup 的功能,用得比较多的还是 sync.WaitGroup

package main

import (
    "fmt"
)

func main() {
    fmt.Println("start")
    var chanInt chan int = make(chan int)

    // 开启一个工作例程
    go func() {
        for i := 'A'; i <= 'D'; i++ {
            fmt.Println(string(i))
        }
        // 往管道中写入 0 值
        chanInt <- 0
    }()

    // 开启二个工作例程
    go func() {
        for i := 0; i <= 5; i++ {
            fmt.Println(i)
        }
        // 往管道中写入 0 值
        chanInt <- 0
    }()

    for i := 0; i < 2; i++ {
        // 读取管道两次,因为有两个协程里面都往管道中写入了数据
        <-chanInt
    }

    fmt.Println("end")
}

执行

[23:21:19 root@go codes]#go run testchan.go 
start
0
1
2
3
4
5
A
B
C
D
end

# 两个例程中的数据输出

1.6.7 只读和只写管道

我们平时在定义管道的时候都不会定义一个只读只写的管道,一般是在函数参数传递的时候。

比如说我有一个函数只让他去写那我把管道传递到函数里面的话只把他传成只写的管道。

如果有一个函数在使用管道需要读的话,那我们在进行函数参数传递的时候只定义成只读即可。

我们在定义管道的时候一般不会去定义只读或者只写,一般我都会把读和写分成不同功能去实现

好处:

防止在函数内部有一些其他的错误

只读:  <-chan
只写:  chan<-

范例:

可以在函数参数时声明管道为 chan<- (只写)<-chan(只读)

package main

import "fmt"

// 定义 in 函数传入只写管道
func in(channel chan<- int) {
    channel <- 1
    channel <- 2
}

// 定义 in 函数传入只读管道
func out(channel <-chan int) {
    fmt.Println(<-channel)
    fmt.Println(<-channel)
}

func main() {
    var channel chan int = make(chan int, 3)
    in(channel)
    out(channel)
}

执行

[12:30:30 root@go codes]#go run only.go 
1
2

定义只读管道和只写管道

package main

import "fmt"

// 定义 in 函数传入只写管道
func in(channel chan<- int) {
    channel <- 1
    channel <- 2
}

// 定义 in 函数传入只读管道
func out(channel <-chan int) {
    fmt.Println(<-channel)
    fmt.Println(<-channel)
}

func main() {
    // 定义 channel 读写管道
    var channel chan int = make(chan int, 3)

    // 定义只写管道管道类型为 channel
    var wchannel chan<- int = channel
    // 定义只读管道
    var rchannel <-chan int = channel

    // 传入只写管道
    in(wchannel)

    // 传入只读管道
    out(rchannel)
}

执行

[12:43:03 root@go codes]#go run only.go 
1
2

1.6.8 select-case 语句 多路复用

select-case 一般和 for 循环结合使用

当写入无缓冲区管道或由缓冲区管道已满时写入则会阻塞直到管道中元素被其他例程读取。

同理,当管道中无元素时读取时也会阻塞到管道被其他例程写入元素,若需要同时对多个管道进行监听(写入或读取),则可以使用 select-case 语句

select 语句自上到下执行 case 语句中对管道的读取和写入,当操作成功则执行对应子语句,否则执行下一个 case 语句,当所有 case 都失败,则执行 default 语句,default 语句可省略

Select-case 语句监听每个 case 语句中管道的读取,当某个 case 语句中管道读取成功则执行对应子语句

语法:

select {
    case <- channelA:
        代码块
    case <- channelB:
        代码块
    default:        //  当 channelA 和 channelB 管道中都没有任何执行操作就执行 default 语句
        默认代码块    
}

// 只要有一个管道执行成功(无论读写关闭操作)就执行对应管道的操作

没有对管道有任何操作范例:

代码中我们没有对 channelA、channelB 有任何读写操作所以执行的是默认代码块

package main

import "fmt"

func main() {
    // 在多个管道中只要有一个操作成功,也就是说这个管道可以用来读也可以用来写
    // 只要有一个成功就执行相应逻辑
    channelA := make(chan int)
    channelB := make(chan int)

    select {
    case <-channelA:
        fmt.Println("AAA")
    case <-channelB:
        fmt.Println("BBB")
    default:
        fmt.Println("default")
    }
}

执行

[12:44:31 root@go codes]#go run selectcase.go 
default

# 我们看到上面代码中我们没有对 channelA、channelB 有任何读写操作所以执行的是默认代码块

对管道操作范例:

package main

import (
    "fmt"
    "time"
)

func main() {
    // 在多个管道中只要有一个操作成功,也就是说这个管道可以用来读也可以用来写
    // 只要有一个成功就执行相应逻辑
    channelA := make(chan int)
    channelB := make(chan int)

    go func() {
        // 往 channelA 中写入 1
        channelA <- 1
    }()

    // 等待两秒让工作例程执行完毕
    time.Sleep(time.Second * 2)

    select {
    case <-channelA:
        fmt.Println("AAA")
    case <-channelB:
        fmt.Println("BBB")
    default:
        fmt.Println("default")
    }
}

执行

[13:17:34 root@go codes]#go run selectcase.go 
AAA

# 通过执行我们会发现已经执行了 channelA 语句块中的代码

1.6.8.1 对管道中的值进行操作

当我们把数据写入到了管道中,但是我们还想要管道中的这个值怎么做呢?

package main

import (
    "fmt"
    "time"
)

func main() {
    // 在多个管道中只要有一个操作成功,也就是说这个管道可以用来读也可以用来写
    // 只要有一个成功就执行相应逻辑
    channelA := make(chan int)
    channelB := make(chan int)

    go func() {
        channelA <- 1
    }()

    // 等待两秒让工作例程执行完毕
    time.Sleep(time.Second * 2)
    select {

    // 这里进行赋值,将 channelA 管道中的值赋值给 v ,ok
    case v, ok := <-channelA:
        fmt.Println("AAA", v, ok)
    case <-channelB:
        fmt.Println("BBB")
    default:
        fmt.Println("default")
    }
}

执行

[13:17:37 root@go codes]#go run selectcase.go 
AAA 1 true

# 执行发现输出了 channelA 管道中的值 1 返回 true 表示该管道是开启状态

当管道关闭之后我们也能对关闭的管道进行操作,执行 select 语句

范例

package main

import (
    "fmt"
    "time"
)

func main() {
    // 在多个管道中只要有一个操作成功,也就是说这个管道可以用来读也可以用来写
    // 只要有一个成功就执行相应逻辑
    channelA := make(chan int)
    channelB := make(chan int)

    go func() {
    
        // 这里将 channelA 管道关闭
        close(channelA)
    }()

    // 等待两秒让工作例程执行完毕
    time.Sleep(time.Second * 1)
    select {

    // 这里进行赋值,将 channelA 管道中的值赋值给 v ,ok
    case v, ok := <-channelA:
        fmt.Println("AAA", v, ok)
    case <-channelB:
        fmt.Println("BBB")
    default:
        fmt.Println("default")
    }
}

执行

[13:31:35 root@go codes]#go run selectcase.go 
AAA 0 false

# 0 为 channelA 管道默认值, false 表示关闭了该管道

1.6.8.2 default 语句

default 语句表示如果我们 select 语句中的所有管道都没有任何操作就会默认执行 default 语句

package main

import (
    "fmt"
)

func main() {
    // 在多个管道中只要有一个操作成功,也就是说这个管道可以用来读也可以用来写
    // 只要有一个成功就执行相应逻辑
    channelA := make(chan int)
    channelB := make(chan int)

    select {
    case <-channelA:
        fmt.Println("AAA")
    case <-channelB:
        fmt.Println("BBB")
    default:
        fmt.Println("default")
    }
}

// 上述代码中我们没有对 channelA 和 channelB 这两个管道有任何操作所以就会执行 default 语句

执行

[13:40:37 root@go codes]#go run selectcase.go 
default

1.6.8.3 select-case 超时机制

可以通过 select-case 实现对执行操作超时的控制,比如说有一个程序他有一个超时时间

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// 这有一个 task 函数用来接收结果的,将计算的结果写道 result 管道中
func task(result chan<- int64) {
    // 给随机数种子
    rand.Seed(time.Now().Unix())
    timeInt := rand.Intn(10)

    // 随机 10 秒内休眠,模拟程序的执行时间
    fmt.Println("sleep:", timeInt)
    time.Sleep(time.Duration(timeInt) * time.Second)

    // 将当前时间写入到 result 管道中
    result <- time.Now().Unix()
}

func main() {
    // 定义 int64 类型管道
    var result chan int64 = make(chan int64)
    fmt.Println(time.Now()) // 获取当前时间

    // 开启协程将 result 管道传递给 task 函数
    go task(result)

    fmt.Println("result:", <-result) // 读取管道的值
    fmt.Println(time.Now())          // 表示程序执行完毕的时间
}

执行:

[14:01:44 root@go codes]#go run timeout.go 
2021-07-04 14:02:29.765109625 +0800 CST m=+0.000064071
sleep: 9
result: 1625378558
2021-07-04 14:02:38.76740707 +0800 CST m=+9.002361546

# 这个程序模拟的超时时间为 9 秒

如果我们有一个限制,这个任务必须要在 3 秒之内结束否则就是 timeout 我们又该怎么做呢?

这个时候就需要使用到我们的 select-case

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func task(result chan<- int64) {
    // 定义 10 以内的随机数
    rand.Seed(time.Now().Unix())
    timeInt := rand.Intn(10)

    fmt.Println("sleep:", timeInt)

    // 休眠 10 以内的随机数秒数
    time.Sleep(time.Duration(timeInt) * time.Second)
    result <- time.Now().Unix()
}

func main() {
    var result chan int64 = make(chan int64)
    var timeout chan int = make(chan int)
    fmt.Println(time.Now())

    // 开启协程执行执行 test 函数
    go task(result)

    // 开启第二个协程,里面定义了执行 3 秒,否则关闭 timeout 管道
    go func() {
        time.Sleep(3 * time.Second)
        close(timeout)
    }()

    // 通过 select 来进行判断,如果 result 先执行操作就代表没有 timeout
    // 如果 timeout 管道先执行,就表示 result 管道超时
    select {
    case r := <-result:
        fmt.Println("success:", r)
    case <-timeout:
        fmt.Println("TimeOut")
    }
    fmt.Println(time.Now())
}

执行

# 第一次执行 task 函数中暂停了 8 秒表示该程序已经超时
[14:11:34 root@go codes]#go run timeout.go 
2021-07-04 14:15:49.963957319 +0800 CST m=+0.000221937
sleep: 8
TimeOut
2021-07-04 14:15:52.967654548 +0800 CST m=+3.003919156

# 第二次执行,task 暂停了 3 秒表示没有超时,执行成功
[14:15:52 root@go codes]#go run timeout.go 
2021-07-04 14:16:41.930447437 +0800 CST m=+0.000074461
sleep: 3
success: 1625379404
2021-07-04 14:16:44.932830393 +0800 CST m=+3.002457447

当然超时机制的这个功能我们可以通过 time.After 来实现

1.6.8.4 time.After 实现超时机制

Go 语言 time 包实现了 After 函数,可以用于实现超时机制,After 函数返回一个只读管道。

After 其实就是一个生成管道的工具,比如说多长时间后生成一个管道

func time.After(d time.Duration) <-chan time.Time

范例

package main

import (
    "fmt"
    "time"
)

func main() {
    // 打印当前时间
    fmt.Println("Start:", time.Now())

    // 这里定义的是 3 秒之后返回一个管道,这个管道会阻塞 3 秒
    fmt.Println("END:", <-time.After(3*time.Second))
}

执行

[14:38:16 root@go codes]#go run time.go 
Start: 2021-07-04 14:38:34.864763097 +0800 CST m=+0.000068870
END: 2021-07-04 14:38:37.86714669 +0800 CST m=+3.002452513

# 3 秒之后执行

这个时候我们就可以用 time.After 函数去替代刚才的 timeout

image-20210704144533429

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func task(result chan<- int64) {
    // 定义 10 以内的随机数
    rand.Seed(time.Now().Unix())
    timeInt := rand.Intn(10)

    fmt.Println("sleep:", timeInt)

    // 休眠 10 以内的随机数秒数
    time.Sleep(time.Duration(timeInt) * time.Second)
    result <- time.Now().Unix()
}

func main() {
    var result chan int64 = make(chan int64)
    fmt.Println(time.Now())

    // 开启协程执行执行 test 函数
    go task(result)

    // 通过 select 来进行判断,如果 result 先执行操作就代表没有 timeout
    // 如果 timeout 管道先执行,就表示 result 管道超时
    select {
    case r := <-result:
        fmt.Println("success:", r)

    // 这里直接使用 time.After() 函数,三秒之后返回一个管道
    case <-time.After(3 * time.Second):
        fmt.Println("TimeOut")
    }
    fmt.Println(time.Now())
}

执行

# 第一次执行 0 秒没有超时执行成功
[14:45:41 root@go codes]#go run timeoutv2.go 
2021-07-04 14:45:42.490520707 +0800 CST m=+0.000172494
sleep: 0
success: 1625381142
2021-07-04 14:45:42.490832221 +0800 CST m=+0.000483988

# 第二次执行 8 秒超过了 time.After() 函数中的 3 秒返回的管道操作,直接超时
[14:45:42 root@go codes]#go run timeoutv2.go 
2021-07-04 14:45:43.985380324 +0800 CST m=+0.000175010
sleep: 8
TimeOut
2021-07-04 14:45:46.989789292 +0800 CST m=+3.004583937

1.6.8.5 time.Tick 函数每隔多少秒返回一个管道

func Tick(d Duration) <-chan Time

time.Tick() 函数能够每个多少秒返回一个只读管道,一般用于定时任务,比如说每个多长时间执行一次程序就可以用该函数

package main

import (
    "fmt"
    "time"
)

func main() {
    // 打印当前时间
    fmt.Println("Start:", time.Now())
    
    // 通过 for-range 遍历 time.Tick() 每隔 3 秒把返回的管道赋值给 now 遍历
    for now := range time.Tick(3 * time.Second) {
        fmt.Println("sleep:", now)
    }
}

执行

[14:50:54 root@go codes]#go run time.go 
Start: 2021-07-04 14:51:03.00490193 +0800 CST m=+0.000108695
sleep: 2021-07-04 14:51:06.006506605 +0800 CST m=+3.001713470
sleep: 2021-07-04 14:51:09.007667968 +0800 CST m=+6.002874733
sleep: 2021-07-04 14:51:12.008240217 +0800 CST m=+9.003446972

# 通过输出我们会发现每隔 3 秒执行一次

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇