-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpipeline.go
107 lines (94 loc) · 2.92 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// Copyright © 2013-2018 Pierre Neidhardt <[email protected]>
// Use of this file is governed by the license that can be found in LICENSE.
package main
import (
"fmt"
"os"
"sync"
)
// Stage is the interface implemented by an object that can be added to a
// pipeline to process incoming FileRecords.
// Multiple stages of the same kind can be run in parallel.
// Init() and Close() are run once per goroutine.
type Stage interface {
Init()
Run(*FileRecord) error
Close()
}
// Pipeline processes FileRecords through a sequence of Stages. A FileRecord is
// forwarded to the 'log' channel when a Stage Run() function returns an error,
// or to the 'output' channel otherwise.
//
// The pipeline design automates a few things:
// - It groups log messages by FileRecord; no manual flushing required.
// - It removes some parallelization boilerplate such as channel loops.
// - It makes it easy to change the number of goroutines allocated to the various stages.
type Pipeline struct {
input chan *FileRecord
output chan *FileRecord
log chan *FileRecord
logWg sync.WaitGroup
}
// NewPipeline initializes a Pipeline with an input queue and a log queue.
// The Pipeline waits until its input channel is fed.
func NewPipeline(inputQueueSize, logQueueSize int) *Pipeline {
var p Pipeline
p.input = make(chan *FileRecord, inputQueueSize)
p.output = p.input
p.log = make(chan *FileRecord, logQueueSize)
p.logWg.Add(1)
go func() {
for fr := range p.log {
fmt.Fprint(os.Stderr, fr)
}
p.logWg.Done()
}()
// Return a reference so that the WaitGroup gets referenced properly.
return &p
}
// Add appends a new stage to the Pipeline.
// The Pipeline 'input' does not change, but its 'output' gets forwarded to the
// new Stage. The Stage can be parallelized 'routineCount' times. 'routineCount'
// must be >0. 'NewStage' initializes a Stage structure for each goroutine. It
// allows for data separation between goroutines and keeps the Stage interface
// implicit.
func (p *Pipeline) Add(NewStage func() Stage, routineCount int) {
if routineCount <= 0 {
return
}
var wg sync.WaitGroup
// The output queue is the size of the number of producing goroutines. It
// ensures that routines are not blocking each other.
out := make(chan *FileRecord, routineCount)
wg.Add(routineCount)
for i := 0; i < routineCount; i++ {
go func(input <-chan *FileRecord) {
s := NewStage()
s.Init()
for fr := range input {
err := s.Run(fr)
if err != nil {
p.log <- fr
continue
}
out <- fr
}
s.Close()
wg.Done()
}(p.output)
}
// Change output channel after all the routines have been set up to read from
// the former output channel.
p.output = out
// Close channel when all routines are done.
go func() {
wg.Wait()
close(out)
}()
}
// Close the Pipeline to finish logging.
// Call it once the input has been fully produced and the output fully consumed.
func (p *Pipeline) Close() {
close(p.log)
p.logWg.Wait()
}