-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathpipeline-routes.go
213 lines (173 loc) · 5.52 KB
/
pipeline-routes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package main
import (
"net/http"
"strconv"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/polygon-io/errands-server/schemas"
"github.com/polygon-io/errands-server/utils"
)
const (
dryRunQueryParam = "dryRun"
idParam = "id"
statusFilterQueryParam = "status"
)
func (s *ErrandsServer) createPipeline(c *gin.Context) {
var pipeline schemas.Pipeline
if err := c.ShouldBindJSON(&pipeline); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"message": "pipeline validation failed",
"error": err.Error(),
})
return
}
if err := pipeline.Validate(); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"message": "pipeline validation failed",
"error": err.Error(),
})
return
}
// If this was just a dry run, we're done
dryRun, _ := strconv.ParseBool(c.Query(dryRunQueryParam))
if dryRun {
c.JSON(http.StatusOK, gin.H{"message": "pipeline validated successfully"})
return
}
// Set the ID and status of the pipeline
pipeline.ID = uuid.New().String()
pipeline.Status = schemas.StatusInactive
pipeline.StartedMillis = utils.GetTimestamp()
// Initialize all the errands in the pipeline
for _, errand := range pipeline.Errands {
errand.SetDefaults()
errand.PipelineID = pipeline.ID
errand.Status = schemas.StatusBlocked
if err := s.saveErrand(errand); err != nil {
// This should never really happen
log.WithError(err).Error("error saving errand")
c.JSON(http.StatusInternalServerError, gin.H{"message": "internal store error"})
return
}
}
// Kick off all the unblocked errands now
for _, unblockedErrand := range pipeline.GetUnblockedErrands() {
unblockedErrand.Status = schemas.StatusInactive
s.ErrandStore.SetDefault(unblockedErrand.ID, *unblockedErrand)
s.AddNotification("created", unblockedErrand)
}
s.PipelineStore.SetDefault(pipeline.ID, pipeline)
c.JSON(http.StatusOK, gin.H{
"status": "OK",
"results": pipeline,
})
}
func (s *ErrandsServer) deletePipeline(c *gin.Context) {
pipelineID := c.Param(idParam)
pipeline, exists := s.getPipelineFromStore(pipelineID)
if !exists {
c.JSON(http.StatusNotFound, gin.H{"status": "not_found"})
return
}
s.cascadeDeletePipeline(pipeline)
c.JSON(http.StatusOK, gin.H{"status": "OK"})
}
func (s *ErrandsServer) getPipeline(c *gin.Context) {
pipelineID := c.Param(idParam)
pipeline, exists := s.getPipelineFromStore(pipelineID)
if !exists {
c.JSON(http.StatusNotFound, gin.H{"status": "not_found"})
return
}
c.JSON(http.StatusOK, gin.H{
"status": "OK",
"results": pipeline,
})
}
func (s *ErrandsServer) listPipelines(c *gin.Context) {
statusFilter := c.Query(statusFilterQueryParam)
filterFn := acceptAllPipelineFilter
if statusFilter != "" {
filterFn = statusPipelineFilter(schemas.Status(statusFilter))
}
pipelines := s.getFilteredPipelinesFromStore(filterFn)
// We only want to return an overview of each pipeline in the list API.
// Strip out errands and dependencies from the pipelines to make the response smaller.
// Users can get pipeline details by ID
for i := range pipelines {
pipelines[i].Errands = nil
pipelines[i].Dependencies = nil
}
c.JSON(http.StatusOK, gin.H{
"status": "OK",
"results": pipelines,
})
}
func (s *ErrandsServer) cascadeDeletePipeline(pipeline schemas.Pipeline) {
// Delete all of the errands in the pipeline
for _, errand := range pipeline.Errands {
s.deleteErrandByID(errand.ID)
}
// Delete the pipeline itself
s.PipelineStore.Delete(pipeline.ID)
}
func (s *ErrandsServer) getPipelineFromStore(pipelineID string) (schemas.Pipeline, bool) {
pipeline, exists := s.PipelineStore.Get(pipelineID)
if !exists {
return schemas.Pipeline{}, false
}
return pipeline.(schemas.Pipeline), true
}
func (s *ErrandsServer) getFilteredPipelinesFromStore(filterFn func(pipeline schemas.Pipeline) bool) []schemas.Pipeline {
var results []schemas.Pipeline
for _, pipelineItem := range s.PipelineStore.Items() {
pipeline := pipelineItem.Object.(schemas.Pipeline)
if filterFn(pipeline) {
results = append(results, pipeline)
}
}
return results
}
func statusPipelineFilter(status schemas.Status) func(pipeline schemas.Pipeline) bool {
return func(pipeline schemas.Pipeline) bool {
return pipeline.Status == status
}
}
func acceptAllPipelineFilter(pipeline schemas.Pipeline) bool {
return true
}
func (s *ErrandsServer) updateErrandInPipeline(errand *schemas.Errand) {
pipeline, exists := s.getPipelineFromStore(errand.PipelineID)
if !exists {
return
}
// Update the pipeline's internal representation of the errand
for i, pipelineErrand := range pipeline.Errands {
if errand.ID == pipelineErrand.ID {
pipeline.Errands[i] = errand
break
}
}
// Check for any newly unblocked errands
for _, unblockedErrand := range pipeline.GetUnblockedErrands() {
// If this unblocked errand is already in progress, just continue
if unblockedErrand.Status != schemas.StatusBlocked {
continue
}
unblockedErrand.Status = schemas.StatusInactive
if err := unblockedErrand.AddToLogs("INFO", "errand unblocked"); err != nil {
// Log this but continue
log.WithError(err).Error("unable to add to errand logs")
}
s.ErrandStore.SetDefault(unblockedErrand.ID, *unblockedErrand)
}
pipeline.RecalculateStatus()
// If the pipeline just finished and is marked as deleteOnCompleted, delete it now
if pipeline.Status == schemas.StatusCompleted && pipeline.DeleteOnCompleted {
s.cascadeDeletePipeline(pipeline)
} else {
// Otherwise save the updated pipeline
s.PipelineStore.SetDefault(pipeline.ID, pipeline)
}
}