-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathprocess.go
65 lines (60 loc) · 1.81 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package pipeline
import (
"context"
"github.com/deliveryhero/pipeline/v2/semaphore"
)
// Process takes each input from the `in <-chan Input` and calls `Processor.Process` on it.
// When `Processor.Process` returns an `Output`, it will be sent to the output `<-chan Output`.
// If `Processor.Process` returns an error, `Processor.Cancel` will be called with the corresponding input and error message.
// Finally, if the `Context` is canceled, all inputs remaining in the `in <-chan Input` will go directly to `Processor.Cancel`.
func Process[Input, Output any](ctx context.Context, processor Processor[Input, Output], in <-chan Input) <-chan Output {
out := make(chan Output)
go func() {
for i := range in {
process(ctx, processor, i, out)
}
close(out)
}()
return out
}
// ProcessConcurrently fans the in channel out to multiple Processors running concurrently,
// then it fans the out channels of the Processors back into a single out chan
func ProcessConcurrently[Input, Output any](ctx context.Context, concurrently int, p Processor[Input, Output], in <-chan Input) <-chan Output {
// Create the out chan
out := make(chan Output)
go func() {
// Perform Process concurrently times
sem := semaphore.New(concurrently)
for i := range in {
sem.Add(1)
go func(i Input) {
process(ctx, p, i, out)
sem.Done()
}(i)
}
// Close the out chan after all of the Processors finish executing
sem.Wait()
close(out)
}()
return out
}
func process[A, B any](
ctx context.Context,
processor Processor[A, B],
i A,
out chan<- B,
) {
select {
// When the context is canceled, Cancel all inputs
case <-ctx.Done():
processor.Cancel(i, ctx.Err())
// Otherwise, Process all inputs
default:
result, err := processor.Process(ctx, i)
if err != nil {
processor.Cancel(i, err)
return
}
out <- result
}
}