Go语言并发编程

编程 · 02-22 · 297 人浏览

goroutine(go程)的概念类似于线程,但goroutine是由Go运行时(runtime)调度和管理的。Go程序会智能地将goroutine中的任务合理地分配给每个CPU。

go程占用的系统资源远远小于线程,一个go程大约需要4K-5K的内存资源,一个程序可以启动大量的go程。

协程

在Go中,协程(Coroutine)被称为goroutine,是单线程下的并发,又称微线程。

package main

import (
  "fmt"
  "time"
)

func test() {
  for i := 0; i < 10; i++ {
    fmt.Println(i)
    time.Sleep(time.Second)
  }
}

func main() {
  go test() // 开启一个协程

  for i := 100; i < 110; i++ {
    fmt.Println(i)
    time.Sleep(time.Second)
  }
}

主死从随

如果主线程退出,协程即使没有执行完毕,也会退出。

func test() {
  for i := 0; i < 1000; i++ {
    fmt.Println(i)
    time.Sleep(time.Second)
  }
}

func main() {
  go test() // 开启一个协程

  for i := 1000; i < 1010; i++ {
    fmt.Println(i)
    time.Sleep(time.Second)
  }
}

WaitGroup用于等待一组线程结束。父线程调用Add方法来设定应等待的线程数量,每个被等待线程在结束时调用Done方法。同时,
主线程里可以调用Wait方法阻塞至所有线程结束,使得主线程在子协程结束后再自动结束。

var wg sync.WaitGroup // 只定义无需赋值

func main() {
  // wg.Add(5) // 如果知道协程数量,可以直接设置计数器
  for i := 1; i <= 5; i++ {
    wg.Add(1) // 协程开始,计数器加1
    go func(n int) {
      defer wg.Done() // 协程结束,计数器减1
      fmt.Println(n)
    }(i)
  }

  wg.Wait() // 主线程一直在阻塞,等待所有协程结束,直到wg计数器为0
}

互斥锁

多个协程操作同一数据,可能会导致不可预测的结果或者程序错误。

Mutex为互斥锁,Lock()加锁,Unlock()解锁,加锁后不能再次对其进行加锁,直到对其解锁后才能再次加锁,适用于读写不确定场景,即读写次数没有明显的区别。所以说,其性能、效率相对来说比较低。

var wg sync.WaitGroup
var lock sync.Mutex
var count int

func add() {
  defer wg.Done()
  for i := 0; i < 100000; i++ {
    lock.Lock() // 加锁
    count++
    lock.Unlock() // 解锁
  }
}

func sub() {
  defer wg.Done()
  for i := 0; i < 100000; i++ {
    lock.Lock() // 加锁
    count--
    lock.Unlock() // 解锁
  }
}

func main() {
  wg.Add(2)
  go add()
  go sub()
  wg.Wait()
  println(count) // 0
}

读写锁

RWMutex是一个读写锁,其经常用于读次数远远多于写次数的场景。在读的时候,数据之间不产生影响,写和读之间才会产生影响。

var wg sync.WaitGroup
var lock sync.RWMutex

func read() {
  defer wg.Done()
  lock.RLock()
  println("read start")
  time.Sleep(time.Second)
  println("read stop")
  lock.RUnlock()
}

func write() {
  defer wg.Done()
  lock.Lock()
  println("write start")
  time.Sleep(time.Second * 2)
  println("write stop")
  lock.Unlock()
}

func main() {
  wg.Add(7)
  for i := 0; i < 5; i++ {
    go read()
  }
  go write()
  go write()

  wg.Wait()
}

原子操作

加锁操作性能开销大,原子操作性能由于加锁操作。

package main

import (
  "fmt"
  "sync"
  "sync/atomic"
  "time"
)

var x int64
var l sync.Mutex
var wg sync.WaitGroup

func mutexAdd() {
  l.Lock()
  x++
  l.Unlock()
  wg.Done()
}

func atomicAdd() {
  atomic.AddInt64(&x, 1)
  wg.Done()
}

func main() {
  start := time.Now()
  for i := 0; i < 1000000; i++ {
    wg.Add(1)
    // go mutexAdd() // 加锁版add函数
    go atomicAdd() // 原子操作版add函数
  }
  wg.Wait()

  end := time.Now()
  fmt.Println(x)
  fmt.Println(end.Sub(start))
}

提前退出go程

func main() {
  go func() {
    func() {
      println("子go程内部函数")
      // return // 退出当前函数
      // os.Exit(-1) // 退出整个程序
      runtime.Goexit() // 退出当前go程
    }()

    println("子go程退出")
  }()

  println("主go程")
  time.Sleep(time.Second)
  println("主go程退出")
}

runtime包

runtime.Gosched()让出CPU时间片,重新等待安排任务:

func main() {
  go func(s string) {
    for i := 0; i < 3; i++ {
      fmt.Println(s)
    }
  }("world")

  for i := 0; i < 3; i++ {
    runtime.Gosched() // 切一下,再次分配任务
    fmt.Println("hello")
  }
}

runtime.GOMAXPROCS():
Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码,默认值是机器上的CPU核心数,通过runtime.GOMAXPROCS()函数可设置当前程序并发时占用的CPU逻辑核心数。

Go 1.5版本之前,默认使用的是单核心执行。Go 1.5版本之后,默认使用全部的CPU逻辑核心数,可以通过将任务分配到不同的CPU逻辑核心上实现并行的效果。

管道

管道(channel)本质是一个数据结构-队列。数据是先进先出的,多协程访问时,不需要加锁,channel本身就是线程安全的。管道是有类型的,一个string管道只能存放string类型数据。

WaitGroup、Mutex、Cond是传统同步机制,可以使用管道来等待goroutine结束。

func main() {
  // 定义一个 int 类型的管道
  var intChan chan int
  intChan = make(chan int, 3) // 初始化管道,容量为 3
  intChan <- 1                // 向管道中写入数据
  intChan <- 2

  close(intChan) // 管道关闭后,不能再写入数据,但仍然可以读取数据

  n1 := <-intChan // 从管道中读取数据
  n2 := <-intChan

  println(n1, n2)
}

默认情况下,管道是双向的,即可读可写。若想让管道只写:var intChan chan<- int,只读:var intChan <-chan int

func producer(out chan<- int) {
  for i := 0; i < 10; i++ {
    out <- i
    println("生产", i)
  }
}

func consumer(in <-chan int) {
  for i := range in {
    println("消费", i)
  }
}

func main() {
  numChan := make(chan int, 5)
  go producer(numChan) // 双向管道可以赋值给同类型单向管道,反之不行
  go consumer(numChan)
  time.Sleep(time.Second * 2)
}

管道遍历

func main() {
  var intChan chan int
  intChan = make(chan int, 100)
  for i := 0; i < 100; i++ {
    intChan <- i
  }

  // 遍历前要关闭管道,否则会出现死锁(for range会一直等待)
  close(intChan)
  for v := range intChan {
    println("value:", v)
  }
}

select

解决多个管道的选择问题,也可以叫做多路复用,可以从多个管道中随机公平地选择一个来执行。case后面必须进行的是io操作,不能是等值,随机去选择一个io操作。防止select被阻塞住,加入default。

func main() {
  intChan := make(chan int, 1)
  go func() {
    time.Sleep(time.Second * 10)
    intChan <- 10
  }()

  strChan := make(chan string, 1)
  go func() {
    time.Sleep(time.Second * 15)
    strChan <- "hello"
  }()

  select {
  case v := <-intChan:
    println("int value:", v)
  case v := <-strChan:
    println("string value:", v)
  default:
    println("no value received")
  }
}

无缓冲管道

使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。

func recv(c chan int) {
  ret := <-c // 接收数据
  fmt.Println("recv:", ret)
}

func main() {
  ch := make(chan int)
  go recv(ch) // 启用goroutine从管道接收数据
  ch <- 100   // 向管道发送数据
  fmt.Println("main: send 100 to channel")
}

有缓冲管道

只要管道容量大于零,那么该管道就是有缓冲管道,管道容量表示管道中能存放元素的数量。

管道总结

  • 当管道写满了,写阻塞
  • 当缓冲区读完了,读阻塞
  • 从nil管道读取、写入数据,都会阻塞,不会崩溃
  • 从一个已经close的管道读取数据,返回零值,不会崩溃
  • 向一个已经close的管道写数据,会崩溃
  • 关闭一个已经close的管道,会崩溃
  • 读写次数,一定要对等
Go
Theme Jasmine by Kent Liao