Skip to content

Commit

Permalink
feat(outputs): Apply filter to outputs (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm authored Mar 24, 2023
1 parent 058655b commit a97fac7
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 11 deletions.
6 changes: 6 additions & 0 deletions example_sentry.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ ethereum:
outputs:
- name: http-sink
type: http
# filter:
# eventNames:
# - BEACON_API_ETH_V1_EVENTS_BLOCK
config:
address: http://localhost:8080
headers:
Expand All @@ -30,6 +33,9 @@ outputs:
maxExportBatchSize: 512
- name: xatu-server
type: xatu
# filter:
# eventNames:
# - BEACON_API_ETH_V1_EVENTS_BLOCK
config:
address: localhost:8080
tls: false
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimicry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (c *Config) CreateSinks(log logrus.FieldLogger) ([]output.Sink, error) {
sinks := make([]output.Sink, len(c.Outputs))

for i, out := range c.Outputs {
sink, err := output.NewSink(out.SinkType, out.Config, log)
sink, err := output.NewSink(out.SinkType, out.Config, log, out.FilterConfig)
if err != nil {
return nil, err
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/output/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ethpandaops/xatu/pkg/output/http"
"github.com/ethpandaops/xatu/pkg/output/stdout"
"github.com/ethpandaops/xatu/pkg/output/xatu"
pxatu "github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/sirupsen/logrus"
)

Expand All @@ -16,6 +17,8 @@ type Config struct {
SinkType SinkType `yaml:"type"`

Config *RawMessage `yaml:"config"`

FilterConfig pxatu.EventFilterConfig `yaml:"filter"`
}

func (c *Config) Validate() error {
Expand All @@ -26,7 +29,7 @@ func (c *Config) Validate() error {
return nil
}

func NewSink(sinkType SinkType, config *RawMessage, log logrus.FieldLogger) (Sink, error) {
func NewSink(sinkType SinkType, config *RawMessage, log logrus.FieldLogger, filterConfig pxatu.EventFilterConfig) (Sink, error) {
if sinkType == SinkTypeUnknown {
return nil, errors.New("sink type is required")
}
Expand All @@ -45,7 +48,7 @@ func NewSink(sinkType SinkType, config *RawMessage, log logrus.FieldLogger) (Sin
return nil, err
}

return http.New(conf, log)
return http.New(conf, log, &filterConfig)
case SinkTypeStdOut:
conf := &stdout.Config{}

Expand All @@ -59,7 +62,7 @@ func NewSink(sinkType SinkType, config *RawMessage, log logrus.FieldLogger) (Sin
return nil, err
}

return stdout.New(conf, log)
return stdout.New(conf, log, &filterConfig)
case SinkTypeXatu:
conf := &xatu.Config{}

Expand All @@ -73,7 +76,7 @@ func NewSink(sinkType SinkType, config *RawMessage, log logrus.FieldLogger) (Sin
return nil, err
}

return xatu.New(conf, log)
return xatu.New(conf, log, &filterConfig)
default:
return nil, fmt.Errorf("sink type %s is unknown", sinkType)
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/output/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ type HTTP struct {
config *Config
log logrus.FieldLogger
proc *processor.BatchItemProcessor[xatu.DecoratedEvent]
filter xatu.EventFilter
}

func New(config *Config, log logrus.FieldLogger) (*HTTP, error) {
func New(config *Config, log logrus.FieldLogger, filterConfig *xatu.EventFilterConfig) (*HTTP, error) {
if config == nil {
return nil, errors.New("config is required")
}
Expand All @@ -31,6 +32,11 @@ func New(config *Config, log logrus.FieldLogger) (*HTTP, error) {
return nil, err
}

filter, err := xatu.NewEventFilter(filterConfig)
if err != nil {
return nil, err
}

proc := processor.NewBatchItemProcessor[xatu.DecoratedEvent](exporter,
log,
processor.WithMaxQueueSize(config.MaxQueueSize),
Expand All @@ -43,6 +49,7 @@ func New(config *Config, log logrus.FieldLogger) (*HTTP, error) {
config: config,
log: log,
proc: proc,
filter: filter,
}, nil
}

Expand All @@ -59,6 +66,15 @@ func (h *HTTP) Stop(ctx context.Context) error {
}

func (h *HTTP) HandleNewDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error {
shouldBeDropped, err := h.filter.ShouldBeDropped(event)
if err != nil {
return err
}

if shouldBeDropped {
return nil
}

h.proc.Write(event)

return nil
Expand Down
18 changes: 17 additions & 1 deletion pkg/output/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ type StdOut struct {
config *Config
log logrus.FieldLogger
proc *processor.BatchItemProcessor[xatu.DecoratedEvent]
filter xatu.EventFilter
}

func New(config *Config, log logrus.FieldLogger) (*StdOut, error) {
func New(config *Config, log logrus.FieldLogger, filterConfig *xatu.EventFilterConfig) (*StdOut, error) {
if config == nil {
return nil, errors.New("config is required")
}
Expand All @@ -31,12 +32,18 @@ func New(config *Config, log logrus.FieldLogger) (*StdOut, error) {
return nil, err
}

filter, err := xatu.NewEventFilter(filterConfig)
if err != nil {
return nil, err
}

proc := processor.NewBatchItemProcessor[xatu.DecoratedEvent](exporter, log)

return &StdOut{
config: config,
log: log,
proc: proc,
filter: filter,
}, nil
}

Expand All @@ -53,6 +60,15 @@ func (h *StdOut) Stop(ctx context.Context) error {
}

func (h *StdOut) HandleNewDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error {
shouldBeDropped, err := h.filter.ShouldBeDropped(event)
if err != nil {
return err
}

if shouldBeDropped {
return nil
}

h.proc.Write(event)

return nil
Expand Down
18 changes: 17 additions & 1 deletion pkg/output/xatu/xatu.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ type Xatu struct {
config *Config
log logrus.FieldLogger
proc *processor.BatchItemProcessor[xatu.DecoratedEvent]
filter xatu.EventFilter
}

func New(config *Config, log logrus.FieldLogger) (*Xatu, error) {
func New(config *Config, log logrus.FieldLogger, filterConfig *xatu.EventFilterConfig) (*Xatu, error) {
if config == nil {
return nil, errors.New("config is required")
}
Expand All @@ -31,6 +32,11 @@ func New(config *Config, log logrus.FieldLogger) (*Xatu, error) {
return nil, err
}

filter, err := xatu.NewEventFilter(filterConfig)
if err != nil {
return nil, err
}

proc := processor.NewBatchItemProcessor[xatu.DecoratedEvent](exporter,
log,
processor.WithMaxQueueSize(config.MaxQueueSize),
Expand All @@ -43,6 +49,7 @@ func New(config *Config, log logrus.FieldLogger) (*Xatu, error) {
config: config,
log: log,
proc: proc,
filter: filter,
}, nil
}

Expand All @@ -59,6 +66,15 @@ func (h *Xatu) Stop(ctx context.Context) error {
}

func (h *Xatu) HandleNewDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error {
shouldBeDropped, err := h.filter.ShouldBeDropped(event)
if err != nil {
return err
}

if shouldBeDropped {
return nil
}

h.proc.Write(event)

return nil
Expand Down
14 changes: 13 additions & 1 deletion pkg/proto/xatu/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,19 @@ func (f *eventFilter) Apply(events []*DecoratedEvent) ([]*DecoratedEvent, error)
}

func (f *eventFilter) ShouldBeDropped(event *DecoratedEvent) (bool, error) {
if len(f.eventNames) == 0 {
return false, nil
}

return f.applyEventNamesFilter(event), nil
}

func (f *eventFilter) applyEventNamesFilter(event *DecoratedEvent) bool {
if len(f.eventNames) == 0 {
return false
}

_, ok := f.eventNames[event.Event.Name.String()]

return !ok, nil
return !ok
}
22 changes: 22 additions & 0 deletions pkg/proto/xatu/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,25 @@ func TestEventFilter_ShouldBeDropped(t *testing.T) {

assert.True(t, shouldBeDropped)
}

func TestEventFilter_AllowEverythingWhenEmpty(t *testing.T) {
events := []*DecoratedEvent{}

for _, eventName := range Event_Name_value {
events = append(events, &DecoratedEvent{
Event: &Event{
Name: Event_Name(eventName),
},
})
}

emptyConfig := &EventFilterConfig{}
filter, _ := NewEventFilter(emptyConfig)

filteredEvents, err := filter.Apply(events)
if err != nil {
t.Fatal(err)
}

assert.Equal(t, events, filteredEvents)
}
2 changes: 1 addition & 1 deletion pkg/sentry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (c *Config) CreateSinks(log logrus.FieldLogger) ([]output.Sink, error) {
sinks := make([]output.Sink, len(c.Outputs))

for i, out := range c.Outputs {
sink, err := output.NewSink(out.SinkType, out.Config, log)
sink, err := output.NewSink(out.SinkType, out.Config, log, out.FilterConfig)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/service/event-ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (e *Ingester) CreateSinks() ([]output.Sink, error) {
sinks := make([]output.Sink, len(e.config.Outputs))

for i, out := range e.config.Outputs {
sink, err := output.NewSink(out.SinkType, out.Config, e.log)
sink, err := output.NewSink(out.SinkType, out.Config, e.log, out.FilterConfig)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit a97fac7

Please sign in to comment.