-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathhooks.go
399 lines (355 loc) · 12 KB
/
hooks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"runtime/debug"
"strings"
"time"
"github.com/mjl-/bstore"
)
var hookactivity = make(chan struct{})
var hooktokens chan struct{}
var hookClient = &http.Client{Transport: hookTransport()}
func kickHooksQueue() {
select {
case hookactivity <- struct{}{}:
default:
}
}
func hookTransport() *http.Transport {
t := http.DefaultTransport.(*http.Transport).Clone()
// We are not likely to talk to the host again soon.
t.IdleConnTimeout = 3 * time.Second
t.DisableKeepAlives = true
t.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
// We first resolve to IPs. If there are any fishy IPs, we fail the dial. Otherwise
// we do a regular dial.
// todo: should make sure we actually dial those same ip's. dial could do another lookup.
host, _, err := net.SplitHostPort(addr)
if err != nil {
return nil, fmt.Errorf("parse dial address %q: %v", addr, err)
}
ips, err := net.DefaultResolver.LookupIP(ctx, strings.Replace(network, "tcp", "ip", 1), host)
if err != nil {
return nil, fmt.Errorf("looking up ips for host %q: %v", host, err)
}
for _, ip := range ips {
if !config.WebhooksAllowInternalIPs && (ip.IsLoopback() || ip.IsPrivate()) || ip.IsMulticast() || ip.IsUnspecified() {
return nil, fmt.Errorf("host %q resolves to ip %s, not allowed", host, ip)
}
}
d := net.Dialer{}
return d.DialContext(ctx, network, addr)
}
return t
}
// todo: make configurable?
const concurrentHooksMax = 10
// todo: group notifications for multiple versions of a single package in a single webhook call?
func deliverHooksInit() {
// Up to N concurrent webhook calls.
hooktokens = make(chan struct{}, concurrentHooksMax)
for i := 0; i < concurrentHooksMax; i++ {
hooktokens <- struct{}{}
}
}
func deliverHooks() {
deliverHooksInit()
// Wait a bit before making calls. If our process gets restarted in a loop, we
// don't want to hammer anyone too hard.
time.Sleep(3 * time.Second)
timer := time.NewTimer(0)
for {
// Fetch time of first next hook to process.
qn := bstore.QueryDB[Hook](context.Background(), database)
qn.FilterEqual("Done", false)
qn.FilterLessEqual("NextAttempt", time.Now())
qn.SortAsc("NextAttempt")
qn.Limit(1)
firsthook, err := qn.Get()
if err == bstore.ErrAbsent {
// No hook to handle, wait for something to happen.
slog.Debug("no hook to schedule, waiting for activity")
<-hookactivity
continue
}
if err != nil {
logFatalx("looking up next hook to deliver", err)
}
d := time.Until(firsthook.NextAttempt)
if d < 0 {
d = 0
}
timer.Reset(d)
select {
case <-hookactivity:
// Schedule again.
continue
case <-timer.C:
// Time to go!
}
// Retrieve all hooks we might be able to start.
deliverHooksOnce()
}
}
func deliverHooksOnce() {
qw := bstore.QueryDB[Hook](context.Background(), database)
qw.FilterEqual("Done", false)
qw.FilterLessEqual("NextAttempt", time.Now())
qw.SortAsc("NextAttempt")
qw.Limit(concurrentHooksMax)
hooks, err := qw.List()
if err != nil {
logFatalx("looking up next hooks to deliver", err)
}
if len(hooks) == 0 {
// Single scheduled hook could have been deleted in the mean time.
return
}
slog.Debug("found hooks to deliver", "nhooks", len(hooks))
for _, h := range hooks {
// Wait for token.
<-hooktokens
// Try to get delivered.
if err := prepareDeliverHook(h); err != nil {
slog.Error("delivering hook", "err", err, "hook", h.ID)
}
}
}
// Mark hook as busy delivering (setting NextAttempt and Attempts), fetch needed
// data. If all good, launch request in goroutine.
func prepareDeliverHook(h Hook) (rerr error) {
log := slog.With("hook", h.ID, "attempts", h.Attempts)
// Ensure we are returning the token if we end up not delivering.
var delivering bool
defer func() {
if !delivering {
hooktokens <- struct{}{}
kickHooksQueue()
}
}()
// Get hookconfig, for url. Increase attempts and set nextattempt in future early
// on, so we won't be hammering servers in failure modes.
var modup ModuleUpdate
var hc HookConfig
var nodeliver bool
err := database.Write(context.Background(), func(tx *bstore.Tx) error {
// Get fresh hook again, it could have vanished in the mean time.
if err := tx.Get(&h); err != nil {
return err
}
// We need the URL.
hc = HookConfig{ID: h.HookConfigID}
if err := tx.Get(&hc); err != nil {
return fmt.Errorf("lookup hook config: %v", err)
}
log = log.With("webhookconfig", hc.Name, "url", hc.URL)
// Get the ModuleUpdate, needed to form the payload.
var err error
modup, err = bstore.QueryTx[ModuleUpdate](tx).FilterNonzero(ModuleUpdate{HookID: h.ID}).Get()
if err != nil {
return fmt.Errorf("looking up moduleupdate: %v", err)
}
// Set next attempt before we start processing, so we at least won't keep
// processing the same hook in case of fatal errors.
// If hook config got disabled in the mean time, mark as done and failed.
if hc.Disabled {
log.Info("hook config is disabled, not delivering hook")
h.Done = true
h.Results = append(h.Results, HookResult{Error: "Hook config is disabled."})
nodeliver = true
return tx.Update(&h)
}
d := 15 * time.Minute / 2
for i := 0; i < h.Attempts; i++ {
d *= 2
}
h.Attempts++
h.NextAttempt = h.NextAttempt.Add(d)
// If we've seen a 429 in the past minute for this HookConfig, we postpone.
q := bstore.QueryTx[Hook](tx)
q.FilterNonzero(Hook{HookConfigID: hc.ID})
q.FilterGreater("LastResult", time.Now().Add(-time.Minute))
q.FilterFn(func(qh Hook) bool {
return len(qh.Results) > 0 && qh.Results[len(qh.Results)-1].StatusCode == http.StatusTooManyRequests
})
toofast, err := q.Exists()
if err != nil {
return fmt.Errorf(`checking for recent http "429 too many request" responses: %v`, err)
}
log = log.With("nextattempt", h.NextAttempt, "newattempts", h.Attempts)
if toofast {
log.Info("not delivering hook due to too many requests")
nodeliver = true
} else {
log.Debug("ready to deliver hook")
}
return tx.Update(&h)
})
if err != nil {
return fmt.Errorf("updating hook before delivery attempt: %v", err)
}
if nodeliver {
return nil
}
// Make request in goroutine. It is responsible for returning the token.
go deliverHook(h, hc, modup)
delivering = true
return nil
}
// todo: should we have signatures on our webhook calls? what would we protect against?
// Actually try to deliver.
func deliverHook(h Hook, hc HookConfig, modup ModuleUpdate) (rerr error) {
// rerr is only used in recover.
log := slog.With("hook", h.ID, "attempts", h.Attempts)
// Always return token.
defer func() {
hooktokens <- struct{}{}
kickHooksQueue()
}()
// Prevent crash for panic.
defer func() {
x := recover()
if x == nil {
return
}
metricPanics.Inc()
slog.Error("uncaught panic delivering hook", "x", x, "hook", h.ID)
debug.PrintStack()
}()
// If we encountered an error, we may disable the HookConfig automatically.
defer func() {
if !h.Done || len(h.Results) == 0 || h.Results[len(h.Results)-1].StatusCode/100 == 2 {
return
}
// Delivery failed. If the last 10 deliveries all failed, or if all deliveries in
// the past 7 days failed (minimum 2), we mark the hookconfig as disabled to
// prevent continued annoyance.
var n int
err := database.Write(context.Background(), func(tx *bstore.Tx) error {
q := bstore.QueryTx[Hook](tx)
q.FilterNonzero(Hook{HookConfigID: h.HookConfigID})
q.FilterGreater("LastResult", time.Now().Add(-7*24*time.Hour))
q.Limit(10)
err := q.ForEach(func(qh Hook) error {
if qh.Done && len(qh.Results) > 0 && qh.Results[len(qh.Results)-1].StatusCode/100 != 2 {
n++
}
if qh.Done {
// Successful delivery.
n = 0
return bstore.StopForEach
}
return nil
})
if err != nil {
return err
}
if n >= 2 {
// todo: should send an email about this, if possible according to rate limits and metasubscription.
log.Info("disabling webhook config due to repeated delivery failures")
hc = HookConfig{ID: hc.ID}
if err := tx.Get(&hc); err != nil {
return fmt.Errorf("get hook config: %v", err)
}
hc.Disabled = true
if err := tx.Update(&hc); err != nil {
return fmt.Errorf("disabling hook config: %v", err)
}
msg := "Disabling webhook config %q due to repeated delivery failures"
log.Info("adding log for user id", "userid", h.UserID, "msg", msg)
if err := tx.Insert(&UserLog{UserID: h.UserID, Text: msg}); err != nil {
return fmt.Errorf("inserting user log about disabling webhook config")
}
}
return nil
})
if err != nil {
log.Error("looking at disabling hook config after delivery failure", "err", err)
}
}()
// todo: should we do more checks against the URL we are about to request? eg have a blocklist of domains/ips (against abuse), or rate limit per destination domain?
// Turn a Go error into a HookResult and possibly mark as completely failed. These
// results are used by the defer funtion above. Not triggered for negatory HTTP
// responses.
defer func() {
if rerr == nil {
return
}
log.Debug("webhook request failed", "err", rerr)
metricHookResponse.WithLabelValues("error").Inc()
h.LastResult = time.Now()
h.Results = append(h.Results, HookResult{Start: h.LastResult, Error: rerr.Error()})
h.Done = h.Attempts >= 9
if err := database.Update(context.Background(), &h); err != nil {
log.Error("error while adding delivery error to hook", "err", err)
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// todo: also send sum? parsed version? last known version we had? more fields?
data := hookData{modup.Module, modup.Version, modup.LogRecordID, modup.Discovered}
payload, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("marshal hook json payload: %v", err)
}
// Request we'll be sending.
req, err := http.NewRequestWithContext(ctx, "POST", hc.URL, bytes.NewReader(payload))
if err != nil {
return fmt.Errorf("new http request: %v", err)
}
// Add headers. We use Set, not Add. Let's keep it simple.
for _, tup := range hc.Headers {
req.Header.Set(tup[0], tup[1])
}
// todo: store unsubscribe token in hookconfig, include it in user-agent, and allow website owner to enter it on /webhooks to disable future calls from this hook config.
req.Header.Set("User-Agent", fmt.Sprintf("gopherwatch/%s (see %s/webhooks)", version, config.BaseURL))
req.Header.Set("Content-Type", "application/json; charset=utf-8")
// Finally make the call.
start := time.Now()
resp, err := hookClient.Do(req)
metricHookRequest.Observe(float64(time.Since(start) / time.Second))
if err != nil {
return fmt.Errorf("http transaction: %v", err)
}
defer resp.Body.Close()
log.Debug("webhook response", "status", resp.StatusCode)
result := fmt.Sprintf("%dxx", resp.StatusCode/100)
metricHookResponse.WithLabelValues(result).Inc()
// Be nice and read a reasonable amount of response data. We don't want to consume
// much more. We don't care about errors, the status code is what matters.
respFragment, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
var response string
ct := strings.ToLower(strings.TrimSpace(strings.SplitN(resp.Header.Get("Content-Type"), ";", 2)[0]))
if ct == "text/plain" || ct == "application/json" {
response = string(respFragment)
}
h.LastResult = time.Now()
h.Results = append(h.Results, HookResult{
StatusCode: resp.StatusCode,
Start: start,
Response: response,
DurationMS: int64(h.LastResult.Sub(start) / time.Millisecond),
})
// Attempts: 0m, 7.5m, 15m, 30m, 1h, 2h, 4h, 8h, 16h; 9 total.
h.Done = resp.StatusCode/100 == 2 || resp.StatusCode == http.StatusForbidden || h.Attempts >= 9
// Store the result.
if err = database.Update(context.Background(), &h); err != nil {
// We're not returning an error anymore, we don't want to add another result.
log.Error("updating hook after delivery", "err", err)
}
return nil
}
// JSON payload we send in webhook.
type hookData struct {
Module string
Version string
LogRecordID int64
Discovered time.Time
}