From ee563f0bfe0507a9eeb79c1ef6507e83b077b609 Mon Sep 17 00:00:00 2001 From: Lakshay Kalbhor Date: Tue, 27 Aug 2024 11:32:49 +0530 Subject: [PATCH 1/2] feat: optional otel tracing --- go.mod | 5 ++-- go.sum | 2 ++ internal/core/core.go | 65 ++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 66 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index c753920..6f7e5ed 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,9 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 github.com/vmihailenco/msgpack v4.0.4+incompatible + go.opentelemetry.io/otel v1.24.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.9.0 + go.opentelemetry.io/otel/sdk v1.24.0 ) require ( @@ -41,9 +44,7 @@ require ( github.com/robfig/cron/v3 v3.0.1 // indirect github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect - go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/metric v1.24.0 // indirect - go.opentelemetry.io/otel/sdk v1.24.0 // indirect go.opentelemetry.io/otel/trace v1.24.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect diff --git a/go.sum b/go.sum index 3c7431b..aac7c55 100644 --- a/go.sum +++ b/go.sum @@ -98,6 +98,8 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.9.0 h1:0uV0qzHk48i1SF8qRI8odMYiwPOLh9gBhiJFpj8H6JY= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.9.0/go.mod h1:Fl1iS5ZhWgXXXTdJMuBSVsS5nkL5XluHbg97kjOuYU4= go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= diff --git a/internal/core/core.go b/internal/core/core.go index de0423c..8a32957 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log/slog" + "os" "strconv" "sync" "time" @@ -15,6 +16,11 @@ import ( "github.com/vmihailenco/msgpack" "github.com/zerodha/dungbeetle/v2/internal/dbpool" "github.com/zerodha/dungbeetle/v2/models" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" ) // Opt represents core options. @@ -23,6 +29,10 @@ type Opt struct { DefaultGroupConcurrency int DefaultJobTTL time.Duration + // Optional tracing parameters + EnableTracing bool + TraceFilePath string + // DSNs for connecting to the broker backend and the broker state backend. Broker tasqueue.Broker Results tasqueue.Results @@ -400,17 +410,64 @@ type taskMeta struct { TTL int `json:"ttl"` } +func initTracer(lo *slog.Logger, fpath string) (*trace.TracerProvider, error) { + // Write telemetry data to a file. + f, err := os.Create(fpath) + if err != nil { + return nil, err + } + defer f.Close() + + exp, err := stdouttrace.New( + stdouttrace.WithWriter(f), + // Use human-readable output. + stdouttrace.WithPrettyPrint(), + ) + if err != nil { + return nil, err + } + + r, err := resource.Merge( + resource.Default(), + resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String("dungbeetle"), + ), + ) + if err != nil { + return nil, err + } + + tp := trace.NewTracerProvider(trace.WithBatcher(exp), trace.WithResource(r)) + defer func() { + if err := tp.Shutdown(context.Background()); err != nil { + lo.Error("error shutting down trace-provider", "error", err) + } + }() + otel.SetTracerProvider(tp) + + return tp, nil +} + // initQueue creates and returns a distributed queue system (Tasqueue) and registers // Tasks (SQL queries) to be executed. The queue system uses a broker (eg: Kafka) and stores // job states in a state store (eg: Redis) func (co *Core) initQueue() (*tasqueue.Server, error) { - var err error - // TODO: set log level - qs, err := tasqueue.NewServer(tasqueue.ServerOpts{ + opts := tasqueue.ServerOpts{ Broker: co.opt.Broker, Results: co.opt.Results, Logger: co.lo.Handler(), - }) + } + if co.opt.EnableTracing { + tp, err := initTracer(co.lo, co.opt.TraceFilePath) + if err != nil { + return nil, err + } + opts.TraceProvider = tp + } + + // TODO: set log level + qs, err := tasqueue.NewServer(opts) if err != nil { return nil, err } From 45ddc8900130c7caeddfeeddb1a5f1b35f6ab155 Mon Sep 17 00:00:00 2001 From: Lakshay Kalbhor Date: Wed, 23 Oct 2024 16:09:11 +0530 Subject: [PATCH 2/2] refactor: handle trace-file output gracefully --- cmd/config.sample.toml | 2 ++ cmd/init.go | 2 ++ config.test_mysql.toml | 2 ++ config.test_pg.toml | 2 ++ config.toml.sample | 2 ++ internal/core/core.go | 44 ++++++++++++++++++++++++++++++------------ 6 files changed, 42 insertions(+), 12 deletions(-) diff --git a/cmd/config.sample.toml b/cmd/config.sample.toml index c031d9c..21e55d7 100644 --- a/cmd/config.sample.toml +++ b/cmd/config.sample.toml @@ -3,6 +3,8 @@ log_level = "INFO" # maximum time allowed for a job to run once it has started execution default_job_ttl = "60s" +enable_tracing = false +trace_file_path = "./out.traces" # The broker that manages job queuing. # Currently, only "redis" is supported. diff --git a/cmd/init.go b/cmd/init.go index f4e4c5f..bf676f1 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -234,6 +234,8 @@ func initCore(ko *koanf.Koanf) (*core.Core, error) { DefaultQueue: ko.MustString("queue"), DefaultGroupConcurrency: ko.MustInt("worker-concurrency"), DefaultJobTTL: ko.MustDuration("app.default_job_ttl"), + EnableTracing: ko.Bool("app.enable_tracing"), + TraceFilePath: ko.String("app.trace_file_path"), Results: rResult, Broker: rBroker, }, srcPool, backends, lo) diff --git a/config.test_mysql.toml b/config.test_mysql.toml index 5d4aaa2..5e2bfce 100644 --- a/config.test_mysql.toml +++ b/config.test_mysql.toml @@ -1,6 +1,8 @@ [app] log_level = "DEBUG" default_job_ttl = "60s" +enable_tracing = false +trace_file_path = "./out.traces" [job_queue.broker] type = "redis" diff --git a/config.test_pg.toml b/config.test_pg.toml index 5ad2009..9d237fc 100644 --- a/config.test_pg.toml +++ b/config.test_pg.toml @@ -1,6 +1,8 @@ [app] log_level = "DEBUG" default_job_ttl = "60s" +enable_tracing = true +trace_file_path = "./out.traces" [job_queue.broker] type = "redis" diff --git a/config.toml.sample b/config.toml.sample index 1d958a0..28178eb 100644 --- a/config.toml.sample +++ b/config.toml.sample @@ -1,6 +1,8 @@ [app] log_level = "DEBUG" default_job_ttl = "60s" +enable_tracing = false +trace_file_path = "./out.traces" [job_queue.broker] type = "redis" diff --git a/internal/core/core.go b/internal/core/core.go index 8a32957..bc3a3d5 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -20,9 +20,24 @@ import ( "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" "go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" ) +// traces is a thin wrapper around otel trace-provider which +// is optionally passed to the distributed queue +type traces struct { + f *os.File + tp *trace.TracerProvider +} + +func (t *traces) Close() error { + if err := t.tp.Shutdown(context.Background()); err != nil { + return err + } + + return t.f.Close() +} + // Opt represents core options. type Opt struct { DefaultQueue string @@ -51,7 +66,8 @@ type Core struct { resultBackends ResultBackends // Distributed queue system. - q *tasqueue.Server + tracer *traces + q *tasqueue.Server // Job states for cancellation. jobCtx map[string]context.CancelFunc @@ -84,6 +100,11 @@ func (co *Core) Start(ctx context.Context, workerName string, concurrency int) e co.q = qs qs.Start(ctx) + // Close the tracer once the job processor ends + if co.opt.EnableTracing { + co.tracer.Close() + } + return nil } @@ -410,13 +431,12 @@ type taskMeta struct { TTL int `json:"ttl"` } -func initTracer(lo *slog.Logger, fpath string) (*trace.TracerProvider, error) { +func initTracer(fpath string) (*traces, error) { // Write telemetry data to a file. f, err := os.Create(fpath) if err != nil { return nil, err } - defer f.Close() exp, err := stdouttrace.New( stdouttrace.WithWriter(f), @@ -439,14 +459,12 @@ func initTracer(lo *slog.Logger, fpath string) (*trace.TracerProvider, error) { } tp := trace.NewTracerProvider(trace.WithBatcher(exp), trace.WithResource(r)) - defer func() { - if err := tp.Shutdown(context.Background()); err != nil { - lo.Error("error shutting down trace-provider", "error", err) - } - }() otel.SetTracerProvider(tp) - return tp, nil + return &traces{ + f: f, + tp: tp, + }, nil } // initQueue creates and returns a distributed queue system (Tasqueue) and registers @@ -458,12 +476,14 @@ func (co *Core) initQueue() (*tasqueue.Server, error) { Results: co.opt.Results, Logger: co.lo.Handler(), } + + var err error if co.opt.EnableTracing { - tp, err := initTracer(co.lo, co.opt.TraceFilePath) + co.tracer, err = initTracer(co.opt.TraceFilePath) if err != nil { return nil, err } - opts.TraceProvider = tp + opts.TraceProvider = co.tracer.tp } // TODO: set log level