做家装网站客户来源多吗,企业oa系统价格,wordpress导航菜单加小图标,简述企业网站的基本功能背景(Why)
Go语言通过其内置的goroutine和通道#xff08;channel#xff09;机制#xff0c;提供了强大的并发支持。goroutine的开销非常低#xff0c;一个goroutine仅占用几KB的内存#xff0c;可以轻松创建成千上万个goroutine来处理并发任务。然而#xff0c;随着并…背景(Why)
Go语言通过其内置的goroutine和通道channel机制提供了强大的并发支持。goroutine的开销非常低一个goroutine仅占用几KB的内存可以轻松创建成千上万个goroutine来处理并发任务。然而随着并发任务数量的增加管理goroutine的生命周期、处理错误以及保证资源正确回收变得越来越复杂。例如我们需要处理以下场景
错误处理困难如果某个goroutine发生错误或panic需要有机制捕获这些错误并作出相应处理。资源管理复杂确保所有goroutine在完成任务后正确回收资源防止资源泄漏。任务调度不灵活在多个goroutine之间调度任务确保高效执行和公平分配。在goroutine执行前后进行必要的操作如日志记录或环境准备。同步复杂性确保所有goroutine都在某个时间点前完成或者在发生重大错误时取消所有未完成的goroutine。
为了解决这些问题引入了一个Group结构体提供了一种更高级的方式来管理一组goroutine。
What
定义一个Group结构体来实现goroutine组管理
type Group struct {chs []func(ctx context.Context) error // 保存所有要在组中执行的任务name string // 组名err error // 保存组中发生的第一个错误ctx context.Context // 组的上下文用于控制任务的执行panicCb func([]byte) bool // 在发生 panic 时调用的回调函数beforeCb func() // 在任务执行之前调用的回调函数panicTimeout time.Duration // 调用 panicCb 之间的时间间隔ch chan func(ctx context.Context) error // 任务通道cancel func() // 取消任务的函数wg sync.WaitGroup // 等待组内所有任务完成errOnce sync.Once // 确保 err 只被设置一次workerOnce sync.Once // 确保 worker 只被启动一次panicTimes int8 // 最大允许 panic 的次数
}chs []func(ctx context.Context) error 类型切片包含多个函数这些函数接受 context.Context 作为参数并返回错误。用途保存所有要在组中执行的任务。 name string 类型字符串。用途保存组的名称。 err error 类型错误。用途保存组中第一个发生的错误。 ctx context.Context 类型上下文。用途控制任务的执行可以用来取消任务或者设置任务的超时时间。 panicCb func([]byte) bool 类型函数接受一个字节切片参数panic 的堆栈信息并返回布尔值。用途当组中的任务发生 panic 时调用的回调函数。 beforeCb func() 类型函数无参数无返回值。用途在每个任务执行之前调用的回调函数。 panicTimeout time.Duration 类型持续时间。用途两次调用 panicCb 之间的时间间隔。如果某个任务频繁地发生 panic而每次 panic 都调用 panicCb这可能会导致系统性能下降或产生大量日志。通过设置 panicTimeout可以限制 panicCb 的调用频率确保在一个指定的时间间隔内不会多次调用 panicCb。 ch chan func(ctx context.Context) error 类型通道包含函数这些函数接受 context.Context 作为参数并返回错误。用途用于在组内传递任务。 cancel func() 类型函数无参数无返回值。用途用于取消组内的所有任务。 wg sync.WaitGroup 类型等待组。用途用于等待组内所有任务完成。 errOnce sync.Once 类型同步 Once。用途确保 err 只被设置一次。 workerOnce sync.Once 类型同步 Once。用途确保 worker 只被启动一次。 panicTimes int8 类型整数8位。用途最大允许的 panic 次数。 创建NewGroup函数
NewGroup函数用于创建一个新的goroutine组实例初始化相关参数并设置panic处理回调函数。
func NewGroup(option Option) *Group {log logger.SLogger(goroutine)name : defaultif len(option.Name) 0 {name option.Name}g : Group{name: name,panicCb: option.PanicCb,panicTimes: option.PanicTimes,panicTimeout: option.PanicTimeout,}//如果 option 中未提供 panicCb则使用默认的 panicCb 回调函数。这个函数会记录 panic 的信息并增加 goroutineCrashedVec 指标。if g.panicCb nil {g.panicCb func(crashStack []byte) bool {log.Errorf(recover panic: %s, string(crashStack))goroutineCrashedVec.WithLabelValues(name).Inc()return true}}goroutineGroups.Inc()return g
}
3.定义GOMAXPROCS 方法
GOMAXPROCS 函数用于设置并发执行的最大 goroutine 数量。具体来说它通过创建一个缓冲通道来限制并发执行的 goroutine 数量并启动相应数量的 goroutine 来处理通道中的任务。
// GOMAXPROCS set max goroutine to work.
func (g *Group) GOMAXPROCS(n int) {if n 0 {panic(goroutine: GOMAXPROCS must great than 0)}g.workerOnce.Do(func() { // 确保该逻辑只执行一次g.ch make(chan func(context.Context) error, n) // 创建缓冲通道大小为 nfor i : 0; i n; i { // 启动 n 个 goroutine 来处理通道中的任务go func() {for f : range g.ch {g.do(f) // 调用 g.do 方法执行任务}}()}})
}使用 sync.Once 确保逻辑只执行一次。创建一个缓冲大小为 n 的通道 g.ch用于存储任务。
启动 n 个 goroutine循环从通道 g.ch 中获取任务并执行 g.do(f) 方法。每个 goroutine 都会持续从通道中获取任务并执行直到通道被关闭。
在 for f : range g.ch 这种结构中如果通道 g.ch 中没有任务读取操作将会阻塞直到有新的任务被写入通道。 也就是说开了n个goroutine在g.ch中等待任务发放和执行任务所以最大并发的goroutine数量为n。某个goroutine从通道 g.ch 中取出的任务 f 不会在另一个 goroutine 的循环中再次出现每个任务只会被一个 goroutine 处理一次。
4. 定义Go方法
Go方法用于启动一个新的goroutine并将其添加到组中进行管理。如果Group已经初始化了工作通道,也就是如果有通道 g.ch则尝试将任务发送到通道如果通道已满无法立即发送则将函数 f 添加到 g.chs 列表中等待稍后执行。如果没有通道 g.ch则立即启动一个新的 goroutine 来执行任务。
func (g *Group) Go(f func(ctx context.Context) error) {g.wg.Add(1)goroutineCounterVec.WithLabelValues(g.name).Inc()if g.ch ! nil {select {case g.ch - f:default:g.chs append(g.chs, f)}return}go g.do(f)
}使用通道 g.ch 来限制同时运行的 goroutine 数量。当通道已满时新的任务会被暂存到 g.chs 列表中。如果没有设置并发限制即 g.ch 为 nil则每次调用 Go 方法都会立即启动一个新的 goroutine 来执行任务。也就是提供了两种模式可供选择
4. 定义Wait方法
Wait 方法用于等待所有通过 Go 方法启动的 goroutine 完成执行并返回第一个非空错误如果有。
func (g *Group) Wait() error {if g.ch ! nil {for _, f : range g.chs {g.ch - f}}g.wg.Wait()if g.ch ! nil {close(g.ch) // let all receiver exit}if g.cancel ! nil {g.cancel()}return g.err
}Wait 方法的设计确保了所有通过 Go 方法启动的 goroutine 都能够正确完成执行并清理所有相关的资源。如果有任何 goroutine 返回错误该方法会返回第一个非空错误。这个方法提供了一种优雅的方式来管理并发任务的生命周期和错误处理。
5. 定义具体执行任务的方法do方法
do方法负责在 goroutine 中执行任务并处理可能发生的 panic。do方法执行传入的任务f。如果任务中发生panicdo方法会根据配置的重试次数进行重试并调用panicCb回调函数。
func (g *Group) do(f func(ctx context.Context) error) {//如果定义了 beforeCb 回调函数调用它。这可以在每次任务开始前执行一些操作如初始化工作或记录日志。if g.beforeCb ! nil {g.beforeCb()}//初始化上下文ctx : g.ctxif ctx nil {ctx context.Background()}//设定重试次数为 g.panicTimes - 1。在 do 方法内部可能会递减该值来控制 panic 的重试逻辑。panicTimes : g.panicTimes - 1var (err error//run 是一个匿名函数用于执行传入的任务 f(ctx)并在任务完成后进行错误处理和资源清理。run func()start time.Now())run func() {//通过 recover 捕获 panic 信息并将堆栈信息存储在 buf 中,记录错误信息并根据 panicCb 回调函数的返回值决定是否重试。defer func() {if r : recover(); r ! nil {goroutineCrashedVec.WithLabelValues(g.name).Inc()isPanicRetry : truebuf : make([]byte, 4096) //nolint:gomndbuf buf[:runtime.Stack(buf, false)]if e, ok : r.(error); ok {buf append([]byte(fmt.Sprintf(%s\n, e.Error())), buf...)}if g.panicCb ! nil {isPanicRetry g.panicCb(buf)}//如果 panicCb 回调函数定义了调用它并判断是否继续重试。if isPanicRetry panicTimes 0 {panicTimes--if g.panicTimeout 0 {time.Sleep(g.panicTimeout)}goroutineRecoverVec.WithLabelValues(g.name).Inc()//重试执行函数run()return} else {//如果重试次数用完了则更新监控指标记录 panic 发生的次数和恢复的次数。goroutineCounterVec.WithLabelValues(g.name).Dec()goroutineCostVec.WithLabelValues(g.name).Observe(float64(time.Since(start)) / float64(time.Second))goroutineStoppedVec.WithLabelValues(g.name).Inc()}err fmt.Errorf(goroutine: panic recovered: %s, r)} else {//没有发生panic则只用记录指标goroutineCounterVec.WithLabelValues(g.name).Dec()goroutineCostVec.WithLabelValues(g.name).Observe(float64(time.Since(start)) / float64(time.Second))goroutineStoppedVec.WithLabelValues(g.name).Inc()}//如果有err则记录在g实例的字段中if err ! nil {g.errOnce.Do(func() {g.err errif g.cancel ! nil {g.cancel()}})}g.wg.Done()}()err f(ctx)}run()
}HOW
下面是一个使用Group管理goroutine的示例代码
func demoFunc(){fmt.Println(finish)
}
func main() {group : NewGroup(Option{Name: example-group,PanicCb: nil, // 使用默认的panic处理回调PanicTimes: 3, // 最大重试次数PanicTimeout: time.Second * 2, // 重试间隔})// 在这个group中启动5个goroutine执行任务即增加五个func到group.chfor i : 0; i 5; i {group.Go(func(ctx context.Context) error {// 在这里放入你要执行的函数(任务)demoFunc()return nil})}// 等待所有任务完成if err : group.Wait(); err ! nil {fmt.Printf(group execution completed with error: %v\n, err)} else {fmt.Println(group execution completed successfully)}
}