
并发:在并发中,这些任务可以在同一个时间段内交替执行,每个任务都有可能在任意时刻被暂停或切换,以便给其他任务执行的机会。并发可以提高系统的吞吐量和资源利用率,但并不一定意味着同时进行多个任务的实际并行执行。
并行:指系统中多个任务同时进行实际的并发执行,每个任务都有自己的执行线程或处理器核心。在并行中,多个任务可以同时进行,彼此之间相互独立,互不干扰。并行可以通过多核处理器、分布式系统等实现,以提高任务的执行速度和效率。
IO 密集型服务的瓶颈不在 CPU 处理速度,而在于尽可能快速的完成高并发、多连接下的数据读写。
协程 Coroutines 是一种比线程更加轻量级的微线程,用户态线程。类比一个进程可以拥有多个线程,一个线程也可以拥有多个协程,因此协程又称微线程和纤程。
协程有独立的栈空间,但是共享堆空间
协程是轻量级的线程,与传统的进程和线程相比,协程的最大特点是 "轻"!可以轻松创建上百万个协程而不会导致系统资源衰竭。
线程本身是有一定大小的,一般OS线程栈大小为**2MB,**且线程在创建和上下文切换的时候是需要消耗资源的,会带来性能损耗,所以在我们用到多线程技术的时候,我们往往会通过池化技术,即创建线程池来管理一定数量的线程。
在go语言中,一个goroutine栈在其生命周期开始时占用空间很小(一般2KB),并且栈大小可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,但是一般不会用到这么大。所以在Go语言中一次创建成千上万,甚至十万左右的goroutine理论上也是可以的。
func()
go func()
package main
import (
"fmt"
"sync"
"time"
)
func say(id string) {
time.Sleep(time.Second)
fmt.Println("I am done ! id: " + id)
wg.Done() //wg中值 - 1
}
var wg sync.WaitGroup
func main() {
wg.Add(2) // wait two groutines
go func(id string) {
fmt.Println(id)
wg.Done()
}("hello")
go say("Hello")
wg.Wait() // 等待所有任务完成,卡住如果wg不是0
fmt.Println("exit")
}
package main
import (
"fmt"
"time"
)
func main() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("main recover:%v\n", e)
}
}()
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("sub recover:%v\n", e)
}
}()
panic("sub func panic!!!") // 发生panic后,不会打印
fmt.Println("111")
}()
panic("main func panic!!!")
panic("main func panic!!!") // 一个recover,只能捕获一次pannic
fmt.Println("222") // 发生panic后,不会打印
time.Sleep(2 * time.Second)
}
用recover捕获异常时,只能捕获当前goroutine的panic,不能捕获其他goroutine发生的panic。
main panic需要在main中设置对应的recover,goroutine中的panic也需要有对应的设置。
因为开启 goroutine 的唯一方法就是将 go
放在函数调用前面,所以当我们想要启动 goroutine 时,我们经常使用 匿名函数(anonymous functions),这样可以实现协程运行部分语句。
Sleep避免运行完直接退出,没有等待goroutine。
var ch = make(chan int)
type RChannel= <-chan int // 定义单向读类型,chan出去,读出来
var rec RChannel = ch
type SChannel = chan<- int // 定义单向写类型,指向chan写进去
var send SChannel = ch
chan T // 可以接收和发送类型为 T 的数据
chan<- float64 // 只可以用来发送 float64 类型的数据
<-chan int // 只可以用来接收 int 类型的数据
close(ch)
,否则会deadlock。ch := make(chan int)
ch := make(chan int, 100)
close(ch)
for val := range ch {
fmt.Println(val)
}
如果channel c已经被关闭,继续往它发送数据会导致panic: send on closed channel
:
从这个关闭的channel中不但可以读取出已发送的数据,还可以不断的读取nil。
go func() {
for i := 0; i < 5; i++ {
v,ok := <-ch // 判断句式读取,检查通道是否关闭
if ok{
fmt.Printf("v=%d\n", v)
}else{
fmt.Printf("channel数据已读完\n")
}
}
}()
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int, 5)
ch <- 1
ch <- 2
close(ch)
go func() {
for v := range ch {
fmt.Printf("v=%d\n", v)
}
}()
time.Sleep(2 * time.Second)
}
range
读取,channel关闭后for循环会跳出,避免读取关闭后的零值。默认情况下,发送和接收会一直阻塞着,直到另一方准备好。这种方式可以用来在gororutine中进行同步,而不必使用显示的锁或者条件变量,channel是并发安全的。
channel在用make函数创建初始化时,会在堆上分配一个runtime.hchan类型数据结构,并返回指向堆上这块hchan内存区域,所以channel是一个引用类型,channel本身就是一个指针。
为什么在堆上创建?
channel用来实现goroutine间通信,生命周期不局限于某个具体函数内。
channel+select
方式来控制。
channel+select
就会比较麻烦,这时就可以通过 context 来实现。type Context interface {
// 当 context 被取消或者到了 deadline,返回一个被关闭的 channel
Done() <-chan struct{}
// 在 channel Done 关闭后,返回 context 取消原因
Err() error
// 返回 context 是否会被取消以及自动取消时间(即 deadline)
Deadline() (deadline time.Time, ok bool)
// 获取 key 对应的 value
Value(key interface{}) interface{}
}
Context
是一个接口,定义了 4 个方法,它们都是幂等
的。也就是说连续多次调用同一个方法,得到的结果都是相同的。
Done() <-chan struct{}
这里返回是一个只读的Channel,对于没有传递值的Channel,读不出任何的值,当Channel关闭时,可以读取对应类型的nil值,作为channel关闭的信号,表明要尽快做退出处理。
Err()
返回一个错误,表示 channel 被关闭的原因。例如是被取消,还是超时。
Deadline()
返回 context 的截止时间,通过此时间,函数就可以决定是否进行接下来的操作,如果时间太短,就可以不往下做了,否则浪费系统资源。当然,也可以用这个 deadline 来设置一个 I/O 操作的超时时间。
Value()
获取之前设置的 key 对应的 value。
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
Done()
方法返回一个只读的 channel,所有相关函数监听此 channel。一旦 channel 关闭,通过 channel 的“广播机制”,所有监听者都能收到。type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
通常被包装后,通过两个导出函数导出
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
// 注意首字母大写
func Background() Context {
return background
}
func TODO() Context {
return todo
}
type cancelCtx struct {
Context
// 保护之后的字段
mu sync.Mutex
done chan struct{}
children map[canceler]struct{}
err error
}
一个可以取消的 Context,实现了 canceler 接口。它直接将接口 Context 作为它的一个匿名字段,这样,它就可以被看成一个 Context。
源码里对外提供了一个创建根节点 context 的函数
ctx := context.Background()
ctx := context.TODO()
有了根节点 context,又提供了四个函数创建子节点 context:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
使用建议
package main
import (
"context"
"fmt"
"time"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
go Watch(ctx, "goroutine1")
go Watch(ctx, "goroutine2")
time.Sleep(6 * time.Second) // 让goroutine1和goroutine2执行6s
fmt.Println("end watching!!!")
cancel() // 通知goroutine1和goroutine2关闭
time.Sleep(1 * time.Second)
}
func Watch(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Printf("%s exit!\n", name) // 主goroutine调用cancel后,会发送一个信号到ctx.Done()这个channel,这里就会收到信息
return
default:
fmt.Printf("%s watching...\n", name)
time.Sleep(time.Second)
}
}
}
package main
import (
"context"
"fmt"
"time"
)
func main() {
ctx, cancel := context.WithDeadline(context.Background(),time.Now().Add(4*time.Second)) // 设置超时时间4当前时间4s之后
defer cancel()
go Watch(ctx, "goroutine1")
go Watch(ctx, "goroutine2")
time.Sleep(6 * time.Second) // 让goroutine1和goroutine2执行6s
fmt.Println("end watching!!!")
}
func Watch(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Printf("%s exit!\n", name) // 4s之后收到信号
return
default:
fmt.Printf("%s watching...\n", name)
time.Sleep(time.Second)
}
}
}
package main
import (
"context"
"fmt"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()
go Watch(ctx, "goroutine1")
go Watch(ctx, "goroutine2")
time.Sleep(6 * time.Second) // 让goroutine1和goroutine2执行6s
fmt.Println("end watching!!!")
}
func Watch(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Printf("%s exit!\n", name) // 主goroutine调用cancel后,会发送一个信号到ctx.Done()这个channel,这里就会收到信息
return
default:
fmt.Printf("%s watching...\n", name)
time.Sleep(time.Second)
}
}
}
package main
import (
"context"
"fmt"
"time"
)
func func1(ctx context.Context) {
fmt.Printf("name is: %s", ctx.Value("name").(string))
}
func main() {
ctx := context.WithValue(context.Background(), "name", "zhangsan")
go func1(ctx)
time.Sleep(time.Second)
}
package main
import (
"context"
"fmt"
"time"
)
func doSomething(ctx context.Context) {
select {
case <-time.After(5 * time.Second):
fmt.Println("finish doing something")
case <-ctx.Done():
err := ctx.Err()
if err != nil {
fmt.Println(err.Error())
}
}
}
func main() {
ctx := context.Background() // root context, return nil context
//todoCtx := context.TODO() // 不确定context类型时使用
ctx, cancel := context.WithCancel(ctx)
go func() {
time.Sleep(6 * time.Second)
cancel()
}()
doSomething(ctx)
}
Goroutine之间可以通过channel来写作,同样也能像传统的对共享内存并发安全
最基础的,是通过time.Sleep
方法,让主goroutine等待一段时间,以便子goroutine能够执行完,显然这个办法不好,不能确定子goroutine需要的时间。
使用channel
,主goroutine向子goroutine里发送struct{}
,直到数据全部取出完毕才能继续后面的逻辑,这样就可以实现等待各个goroutine执行完。显然不是好的办法,管道需要内存开销,并且需要确定执行的次数。
sync.WaitGroup
,使用sync包下的waitgroup来实现并发任务的同步和协程任务。
sync.Once
,并不会在程序启动的时候初始化,而是在第一次用的它的时候才会初始化,并且只初始化这一次,初始化之后驻留在内存里,这就非常适合我们之前提到的配置文件加载场景。
InitConfig()
获取Config 指针的时候才会执行once.Do(func(){instance = &Config{} })
语句,执行完之后instance就驻留在内存中,后面再次执行InitConfig()的时候,就直接返回内存中的instance。// 声明配置结构体Config
type Config struct{}
var instance Config
var once sync.Once // 声明一个sync.Once变量
// 获取配置结构体
func InitConfig() *Config {
once.Do(func(){
instance = &Config{}
})
return instance
}
var mtx sync.Mutex
mtx.Lock()
mtx.Unlock()
defer mtx.Unlock()
。func (rw *RWMutex) Lock() // 对写锁加锁
func (rw *RWMutex) Unlock() // 对写锁解锁
func (rw *RWMutex) RLock() // 对读锁加锁
func (rw *RWMutex) RUnlock() // 对读锁解锁
读写锁的使用遵循以下几个法则:
通俗理解就是可以多个goroutine同时读,但是只有一个goroutine能写,共享资源要么在被一个或多个goroutine读取,要么在被一个goroutine写入, 读写不能同时进行。
当有读锁,写的时候会阻塞。读写之间互斥。
go语言内置的Map并不是线程安全的,在多个goroutine同时操作map的时候,会有并发问题。
原子操作就是这一系列的操作在cpu上执行是一个不可分割的整体,显然要么全部执行,要么全部不执行,不会受到其他操作的影响,也就不会存在并发问题。
Go语言的select语句,是用来起一个goroutine监听多个Channel的读写事件,提高从多个Channel获取信息的效率,相当于也是单线程处理多个IO事件,其思想基本相同。
select {
case <- channel1: // 如果从channel1读取数据成功,执行case语句
do ...
case channel2 <- 1: // 如果向channel2写入数据成功,执行case语句
do ...
default: // 如果上面都没有成功,进入default处理流程
do ...
}
Time是一种一次性时间定时器,即在未来某个时刻,触发的事件只会执行一次。
Timer结构里有一个Time类型的管道C,主要用于事件通知。在未到达设定时间的时候,管道内没有数据写入,一直处于阻塞状态,到达设定时间后,会向管道内写入一个系统事时间,触发事件。
package main
import (
"fmt"
"time"
)
func main() {
timer := time.NewTimer(2 * time.Second) //设置超时时间2s
<-timer.C
fmt.Println("after 2s Time out!")
}
package main
import (
"fmt"
"time"
)
func main() {
timer := time.NewTimer(2 * time.Second) //设置超时时间2s
res := timer.Stop()
fmt.Printf(res)
}
返回值:
对于已经过期或者是已经停止的timer,可以通过重置方法激活使其继续生效。
time.AfterFunc参数为超时时间d和一个具体的函数f,返回一个Timer的指针,作用在创建出timer之后,在当前goroutine,等待一段时间d之后,将执行f。
根据函数定义可以看到,after函数经过时间短d之后会返回timer里的管道,并且这个管道会在经过时段d之后写入数据,调用这个函数,就相当于实现了定时器。
对比timer,Ticker对象会每隔一段时间d就向该通道发送当时的时间,根据这个管道消息来触发事件,但是从定义完成,就会每隔一段时间触发,只有关闭Ticker对象才不会继续发送时间消息。
go语言虽然有着高效的GMP调度模型,理论上支持成千上万的goroutine,但是goroutine过多,对调度,gc以及系统内存都会造成压力,这样会使我们的服务性能不升反降。常用做法可以用池化技术,构造一个协程池,把进程中的协程控制在一定的数量,防止系统中goroutine过多,影响服务性能。
协程池简单理解就是有一个池子一样的东西,里面装这个固定数量的goroutine,当有一个任务到来的时候,会将这个任务交给池子里的一个空闲的goroutine去处理,如果池子里没有空闲的goroutine了,任务就会阻塞等待。所以协程池有三个角色Worker,Task,Pool。
Worker:用于执行任务的goroutine
Task: 具体的任务
Pool: 池子
func NewTask(funcArg func() error) *Task // 创建任务
func NewPool(workerNum int, taskNum int) *Pool // 创建固定协程数量、任务队列长度的协程池
func (p *Pool) AddTask(task *Task) // 添加任务到任务队列
func (p *Pool) Run() //