From 86b28ecd58acc5deb793f6cfbdc76263998341a5 Mon Sep 17 00:00:00 2001 From: ali Date: Wed, 12 Jul 2023 12:43:31 +0300 Subject: [PATCH 1/4] make combiner queries async --- feeds/combiner.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/feeds/combiner.go b/feeds/combiner.go index f8e9bdd3..3b179db2 100644 --- a/feeds/combiner.go +++ b/feeds/combiner.go @@ -12,6 +12,7 @@ import ( "github.com/forta-network/forta-core-go/domain" "github.com/forta-network/forta-core-go/protocol" log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) var ( @@ -172,19 +173,21 @@ func (cf *combinerFeed) forEachAlert(alertHandlers []cfHandler) error { lowerBound := DefaultLookbackPeriod upperBound := int64(0) + errGrp, ctx := errgroup.WithContext(cf.ctx) + + subscriptions := cf.Subscriptions() // Query all subscriptions and process alerts - for _, subscription := range cf.Subscriptions() { - logger = logger.WithFields( - log.Fields{ - "subscriberBotId": subscription.Subscriber.BotID, - "subscribedBotId": subscription.Subscription.BotId, + for _, subscription := range subscriptions { + errGrp.Go( + func() error { + return cf.fetchAlertsAndHandle(ctx, alertHandlers, subscription, lowerBound.Milliseconds(), upperBound) }, ) + } - err := cf.fetchAlertsAndHandle(cf.ctx, alertHandlers, subscription, lowerBound.Milliseconds(), upperBound) - if err != nil { - logger.WithError(err).Warn("failed to fetch alerts and handle") - } + err := errGrp.Wait() + if err != nil{ + logger.WithError(err).Warn("failed to process combiner subscriptions") } // Save alert cache to persistent file, if configured From e575ea24709c2c8e3d9a11555b3407b714f11d7c Mon Sep 17 00:00:00 2001 From: ali Date: Wed, 12 Jul 2023 12:50:29 +0300 Subject: [PATCH 2/4] refactor query loop --- feeds/combiner.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/feeds/combiner.go b/feeds/combiner.go index 3b179db2..8e0e0e94 100644 --- a/feeds/combiner.go +++ b/feeds/combiner.go @@ -177,7 +177,8 @@ func (cf *combinerFeed) forEachAlert(alertHandlers []cfHandler) error { subscriptions := cf.Subscriptions() // Query all subscriptions and process alerts - for _, subscription := range subscriptions { + for i := range subscriptions { + subscription := subscriptions[i] errGrp.Go( func() error { return cf.fetchAlertsAndHandle(ctx, alertHandlers, subscription, lowerBound.Milliseconds(), upperBound) From e56ea53754fcc7bb138fed88b7f3c8fbaa9b1951 Mon Sep 17 00:00:00 2001 From: ali Date: Wed, 12 Jul 2023 19:46:20 +0300 Subject: [PATCH 3/4] use waitgroup instead of error group --- feeds/combiner.go | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/feeds/combiner.go b/feeds/combiner.go index 8e0e0e94..261d0b25 100644 --- a/feeds/combiner.go +++ b/feeds/combiner.go @@ -173,23 +173,29 @@ func (cf *combinerFeed) forEachAlert(alertHandlers []cfHandler) error { lowerBound := DefaultLookbackPeriod upperBound := int64(0) - errGrp, ctx := errgroup.WithContext(cf.ctx) - subscriptions := cf.Subscriptions() + var wg sync.WaitGroup + wg.Add(len(subscriptions)) + // Query all subscriptions and process alerts for i := range subscriptions { subscription := subscriptions[i] - errGrp.Go( - func() error { - return cf.fetchAlertsAndHandle(ctx, alertHandlers, subscription, lowerBound.Milliseconds(), upperBound) - }, - ) + go func() { + defer wg.Done() + logger := logger.WithFields( + log.Fields{ + "subscriberBotId": subscription.Subscriber.BotID, + "subscribedBotId": subscription.Subscription.BotId, + }, + ) + err := cf.fetchAlertsAndHandle(cf.ctx, alertHandlers, subscription, lowerBound.Milliseconds(), upperBound) + if err != nil { + logger.WithError(err).Warn("failed to process combiner subscription") + } + }() } - err := errGrp.Wait() - if err != nil{ - logger.WithError(err).Warn("failed to process combiner subscriptions") - } + wg.Wait() // Save alert cache to persistent file, if configured if cf.cfg.CombinerCachePath != "" { From 44aea6d21cf2a44b134469fb4c6b6fcb4e44ffbe Mon Sep 17 00:00:00 2001 From: ali Date: Wed, 12 Jul 2023 22:29:59 +0300 Subject: [PATCH 4/4] go mod tidy --- feeds/combiner.go | 1 - 1 file changed, 1 deletion(-) diff --git a/feeds/combiner.go b/feeds/combiner.go index 261d0b25..e29714ee 100644 --- a/feeds/combiner.go +++ b/feeds/combiner.go @@ -12,7 +12,6 @@ import ( "github.com/forta-network/forta-core-go/domain" "github.com/forta-network/forta-core-go/protocol" log "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" ) var (