📕
innohub
  • KEEP LEARNING
  • WebInfo
    • JS 部分运算符操作
    • javascript 中的object array
    • 错误处理以及异常捕获
    • JavaScript Bases
    • LaoZi & Confucius
  • PyInfo
    • Python3 输入与输出
    • Python3OS
    • python 修饰器的使用
    • python3 与mysql数据库连接使用实例
    • Format-specifier
    • CGI初学踩坑
    • Django 项目测试
    • Assert-info
    • 使用ngnix和uWSGI配置Django
    • write|SVN​
    • Matplotlib 基本使用
    • 重读 Python 官方文档
    • Python3 Base
    • python3 多线程
    • Python3 函数解析
    • python3 str 对象的基本操作
    • protocl buffers
    • Iterator-Generator
    • Django base
    • Decorator 2
    • Export to wheel or egg-info
    • 三. 运算、数据及逻辑
  • GoInfo
    • Info
      • Select 语句使用
      • First class function
      • Work Pools
      • StructTag
      • Go struct
      • 常用函数
      • Strings操作
      • Golang Bases
      • 数组以及切片
      • 文件操作
      • Golang 中的指针类型
      • Golang Map 类型
      • Reflection
      • 函数闭包
      • 接口
      • Panic and Recover
      • Go中的错误处理
      • 并发
      • defer usage
      • Method in golang
      • Object-oriented Programming
      • Goalng 包管理机制
  • RustInfo
    • Info
      • 包、crate、模块
      • Rust 中的错误处理
      • 智能指针
      • 泛型 generics
      • 数据布局与内存对齐
      • Functions and methods
      • 无畏并发
      • Actix-web 基本使用
      • Got from guessing game
      • 结构体的使用
      • Rust 中的函数式语言功能
      • 集合类型
      • 枚举以及模式匹配
      • Slice 类型
      • 生命周期
      • 测试
      • Rust 中的所有权
      • trait
      • 不安全 Rust
      • Format_print
      • Rust 通用编程概念
      • Macro
  • OS
    • info
      • 内存屏障 -- Part 1
      • 内存屏障 -- Part 2
      • CPU 上下文切换
      • 文件读写与零拷贝
      • ELF 文件
  • MySql
    • info
      • MySql 架构与历史
      • 02-key
  • kubernetes
    • 第二章 k8s 基本概念
    • 第一章 Kubernetes -- 伟大舵手
  • Redis
    • info
      • Redis 总览
      • 02-underline-ds
  • Shell&CInfo
    • GCC 与make的使用
    • C++ 中 malloc 与 new
    • 位运算符操作
    • Base of C macro
    • C 语言中 extern 关键字的用法
  • Distributed
    • info
      • 分布式理论概述
  • Java
    • info
      • Java 内存模型
  • Prometheus
    • Prometheus -- 不灭之火
Powered by GitBook
On this page
  • 1. Wait Group
  • 2. Worker Pool implementation
  • 2.1 线程池
  • 2.2 具体实现

Was this helpful?

  1. GoInfo
  2. Info

Work Pools

[TOC]

1. Wait Group

等待组是一个用以维护Groutines的一个集合,用以等待其中所有的routines执行结束并且返回。对于wait group的控制是阻塞的,当存在routine没有执行结束。对于单个routine的等待可以按照前面的方法,使用channel实现,对于多个routines,可以使用sync.WaitGroup结构体得到更为简洁的实现。假设现在有三个并发执行的Goroutines从main routine催生出来:

  • 使用wg.Done()结束一个routine的等待

  • 使用wg.add(a int)增加需要等待的routine数量

  • 在主进程使用wg.wait(),等待所有的routine返回

  • 每一个routine都需要使用WaitGroup的指针形式作为参数

/*Simple program to use WaitGroup*/
package main

import (
  "fmt"
  "sync"
  "time"
)

// The routine spawned from main routine
func process(i int, wg *sync.WaitGroup) {
  fmt.Println("started Goroutine", i)
  time.Sleep(1 * time.Second)
  fmt.Println("Goroutine", i, "done.")
  wg.Done()
}

func main() {
  no := 5
  var wg sync.WaitGroup
  for i := 0; i < no; i++ {
    wg.Add(1)
    go process(i, &wg)
  }
  wg.Wait()
  fmt.Println("All the routines finished")
}



/*started Goroutine 4
started Goroutine 2
started Goroutine 3
started Goroutine 0
started Goroutine 1
Goroutine 0 done.
Goroutine 4 done.
Goroutine 1 done.
Goroutine 3 done.
Goroutine 2 done.
All the routines finished
*/

WaitGroup结构体对象等待一个routines执行结束集合返回,维护了一个计数器,调用Add()就增加计数器的数值,调用Done()就将该计数值减一,Wait()方法阻塞该对象直到计数器为0。由于先后开启了5个routines,输出时可能比较混乱,但是等待相同时间后,输出就按照顺序。

注意使用WaitGroup时,每一个routine必须传入其指针形式的参数,按值传递的话,进行了复制,主routine无法获得该对象的信息。

2. Worker Pool implementation

带有缓存区的channel的一个重要使用场景是实现线程池(Threads pool, work pool).

2.1 线程池

线程池是操作系统或者计算机程序中用来维护多个线程,这些线程等待执行不同的任务,从而实现程序的并发操作。线程池由控制系统进行管理,主要负责线程数量的确定,线程的创建以及销毁。通过维护一个已经存在的线程池,可以减少线程创建以及销毁的开销,相比于每有一个任务就创建一个线程,结束即销毁的方式由很大优势。

为线程池调度任务的一个常用方法是维护一个同步队列。线程池中的线程移除任务队列中的任务,执行完毕后将其加入完成队列.一旦一个进程完成了任务就可以等待下一个任务。

我们需要实现的worker pool所需要的主要功能:

  • 创建一个Goroutines pool,用以监听一个有缓冲区的channel作为工作集的输入

  • 当一个任务结束后,将结果写出到一个buffered channel

  • 可以从output channel读取结果

2.2 具体实现

先定义工作以及输出结果的结构体类型,并且定义传输Jobs的input channel以及传输result的output channel:

type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)

由于需要实现一个线程池,其中所有的线程需要使用sync.WaitGroup对象进行阻塞等待。所以创建线程池时,根据需要使用的线程数量,添加Goroutines到定义好的WaitGroup中。并且将WaitGroup对象的指针作为routines的参数进行传递。

每一个线程的具体工作是从Jobschannel接收工作,执行该工作,得到Result并且写入Results中去。

func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}

根据任务数量分配任务,并且创建输出结果的routine

func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

使用更多的Routine作为workers,可以得到近似的线性性能提升。

PreviousFirst class functionNextStructTag

Last updated 5 years ago

Was this helpful?

thread pool