para.go (1826B)
1 // Copyright 2019 The Hugo Authors. All rights reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 // Package para implements parallel execution helpers.
15 package para
16
17 import (
18 "context"
19
20 "golang.org/x/sync/errgroup"
21 )
22
23 // Workers configures a task executor with the most number of tasks to be executed in parallel.
24 type Workers struct {
25 sem chan struct{}
26 }
27
28 // Runner wraps the lifecycle methods of a new task set.
29 //
30 // Run wil block until a worker is available or the context is cancelled,
31 // and then run the given func in a new goroutine.
32 // Wait will wait for all the running goroutines to finish.
33 type Runner interface {
34 Run(func() error)
35 Wait() error
36 }
37
38 type errGroupRunner struct {
39 *errgroup.Group
40 w *Workers
41 ctx context.Context
42 }
43
44 func (g *errGroupRunner) Run(fn func() error) {
45 select {
46 case g.w.sem <- struct{}{}:
47 case <-g.ctx.Done():
48 return
49 }
50
51 g.Go(func() error {
52 err := fn()
53 <-g.w.sem
54 return err
55 })
56 }
57
58 // New creates a new Workers with the given number of workers.
59 func New(numWorkers int) *Workers {
60 return &Workers{
61 sem: make(chan struct{}, numWorkers),
62 }
63 }
64
65 // Start starts a new Runner.
66 func (w *Workers) Start(ctx context.Context) (Runner, context.Context) {
67 g, ctx := errgroup.WithContext(ctx)
68 return &errGroupRunner{
69 Group: g,
70 ctx: ctx,
71 w: w,
72 }, ctx
73 }