-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmain.go
93 lines (75 loc) · 2.33 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/VictoriaMetrics/metrics"
"github.com/knadh/koanf/v2"
"github.com/zerodha/kaf-relay/internal/relay"
)
var (
buildString = "unknown"
ko = koanf.New(".")
)
func main() {
// Initialize CLI flags.
initFlags(ko)
fmt.Println(buildString)
if ko.Bool("version") {
os.Exit(0)
}
// Read config files.
initConfig(ko)
// Initialized the structured lo.
lo := initLog(ko)
// Load the optional filter providers.
filters, err := initFilters(ko, lo)
if err != nil {
log.Fatalf("error initializing filter provider: %v", err)
}
// Create a global context with interrupts signals.
globalCtx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
// Initialize metrics set and start the HTTP server.
metr := metrics.NewSet()
metrSrv := initMetricsServer(metr, ko)
go func() {
if err := metrSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Printf("error starting server: %v", err)
}
}()
defer metrSrv.Shutdown(globalCtx)
// Initialize the source and target Kafka config.
consumerCfgs, prodConfig := initKafkaConfig(ko)
// Initialize the source:target topic map from config.
topics := initTopicsMap(ko)
// Initialize the target Kafka (producer) relay.
target, err := relay.NewTarget(globalCtx, initTargetConfig(ko), prodConfig, topics, metr, lo)
if err != nil {
log.Fatalf("error initializing target controller: %v", err)
}
hOf, err := target.GetHighWatermark()
if err != nil {
log.Fatalf("error getting destination high watermark: %v", err)
}
// Initialize the source Kafka (consumer) relay.
srcPool, err := relay.NewSourcePool(initSourcePoolConfig(ko), consumerCfgs, topics, hOf.KOffsets(), metr, lo)
if err != nil {
log.Fatalf("error initializing source pool controller: %v", err)
}
// Initialize the Relay which orchestrates consumption from the sourcePool
// and writing to the target pool.
relay, err := relay.NewRelay(initRelayConfig(ko), srcPool, target, topics, filters, lo)
if err != nil {
log.Fatalf("error initializing relay controller: %v", err)
}
// Start the relay. This is an indefinitely blocking call.
if err := relay.Start(globalCtx); err != nil {
log.Fatalf("error starting relay controller: %v", err)
}
lo.Info("bye")
}