diff --git a/internal/events/eventstream.go b/internal/events/eventstream.go index c88e628a..f6f4a26e 100644 --- a/internal/events/eventstream.go +++ b/internal/events/eventstream.go @@ -404,7 +404,6 @@ func (a *eventStream) eventPoller() { ctx := auth.NewSystemAuthContext() var checkpoint map[string]*big.Int - blockUpdatedFilterStale := false for !a.suspendOrStop() { var err error // Load the checkpoint (should only be first time round) @@ -428,14 +427,12 @@ func (a *eventStream) eventPoller() { blockHeight, exists := checkpoint[sub.info.ID] if !exists || blockHeight.Cmp(big.NewInt(0)) <= 0 { blockHeight, err = sub.setInitialBlockHeight(ctx) - } else { + } else if !sub.inCatchupMode() { sub.setCheckpointBlockHeight(blockHeight) } if err == nil { err = sub.restartFilter(ctx, blockHeight) } - blockUpdatedFilterStale = true - log.Debugf("%s: Checkpoint updated due to stale sub filter: %s", sub.info.ID, blockHeight.String()) } if err == nil { err = sub.processNewEvents(ctx) @@ -453,15 +450,17 @@ func (a *eventStream) eventPoller() { i1 := checkpoint[sub.info.ID] i2 := sub.blockHWM() - changed = changed || blockUpdatedFilterStale || i1 == nil || i1.Cmp(&i2) != 0 + subChanged := i1 == nil || i1.Cmp(&i2) != 0 + if subChanged { + log.Debugf("%s: New checkpoint HWM: %s", a.spec.ID, i2.String()) + } + changed = changed || subChanged checkpoint[sub.info.ID] = new(big.Int).Set(&i2) } if changed { if err = a.sm.storeCheckpoint(a.spec.ID, checkpoint); err != nil { log.Errorf("%s: Failed to store checkpoint: %s", a.spec.ID, err) } - // set this to false before the next evaluation for stale subscriptions - blockUpdatedFilterStale = false } } // the event poller reacts to notification about a stream update, else it starts diff --git a/internal/events/logprocessor.go b/internal/events/logprocessor.go index 8f80d511..0c06f2c7 100644 --- a/internal/events/logprocessor.go +++ b/internal/events/logprocessor.go @@ -67,11 +67,13 @@ type logProcessor struct { } func newLogProcessor(subID string, event *ethbinding.ABIEvent, stream *eventStream) *logProcessor { - return &logProcessor{ + lp := &logProcessor{ subID: subID, event: event, stream: stream, } + lp.highestDispatched.SetInt64(-1) + return lp } func (lp *logProcessor) batchComplete(newestEvent *eventData) { @@ -96,9 +98,8 @@ func (lp *logProcessor) getBlockHWM() big.Int { func (lp *logProcessor) markNoEvents(blockNumber *big.Int) { lp.hwnSync.Lock() if lp.highestDispatched.Cmp(&lp.blockHWM) < 0 { - // Nothing in-flight, its safe to update the HWM - lp.blockHWM.Set(blockNumber) - log.Debugf("%s: HWM: %s", lp.subID, lp.blockHWM.String()) + // Nothing in-flight, its safe to update the HWM - to one after the block we're up to + lp.blockHWM.Set(new(big.Int).Add(blockNumber, big.NewInt(1))) } lp.hwnSync.Unlock() } diff --git a/internal/events/submanager.go b/internal/events/submanager.go index 3fd095b5..05e19a4d 100644 --- a/internal/events/submanager.go +++ b/internal/events/submanager.go @@ -122,6 +122,10 @@ func NewSubscriptionManager(conf *SubscriptionManagerConf, rpc eth.RPCClient, cr if conf.CatchupModePageSize <= 0 { conf.CatchupModePageSize = defaultCatchupModePageSize } + if conf.CatchupModeBlockGap < conf.CatchupModePageSize { + log.Warnf("catchupModeBlockGap=%d must be >= catchupModePageSize=%d - setting to %d", conf.CatchupModeBlockGap, conf.CatchupModePageSize, conf.CatchupModePageSize) + conf.CatchupModeBlockGap = conf.CatchupModePageSize + } return sm } @@ -410,7 +414,7 @@ func (s *subscriptionMGR) loadCheckpoint(streamID string) (map[string]*big.Int, func (s *subscriptionMGR) storeCheckpoint(streamID string, checkpoint map[string]*big.Int) error { cpID := checkpointIDPrefix + streamID b, _ := json.MarshalIndent(&checkpoint, "", " ") - log.Debugf("Storing checkpoint %s: %s", cpID, string(b)) + log.Tracef("Storing checkpoint %s: %s", cpID, string(b)) return s.db.Put(cpID, b) } diff --git a/internal/events/submanager_test.go b/internal/events/submanager_test.go index e4893be4..3ca03e17 100644 --- a/internal/events/submanager_test.go +++ b/internal/events/submanager_test.go @@ -79,6 +79,17 @@ func newTestSubscriptionManager() *subscriptionMGR { return sm } +func TestNestSubscriptionManagerBlockGapValidation(t *testing.T) { + smconf := &SubscriptionManagerConf{ + CatchupModeBlockGap: 10, + CatchupModePageSize: 1000, + } + rpc := ðmocks.RPCClient{} + cr := &contractregistrymocks.ContractStore{} + sm := NewSubscriptionManager(smconf, rpc, cr, newMockWebSocket()).(*subscriptionMGR) + assert.Equal(t, int64(1000), sm.conf.CatchupModeBlockGap) +} + func TestCobraInitSubscriptionManager(t *testing.T) { assert := assert.New(t) cmd := cobra.Command{} diff --git a/internal/events/subscription.go b/internal/events/subscription.go index d44581d9..7d0731d5 100644 --- a/internal/events/subscription.go +++ b/internal/events/subscription.go @@ -225,7 +225,7 @@ func (s *subscription) restartFilter(ctx context.Context, checkpoint *big.Int) e defer cancel() since := checkpoint - if s.catchupBlock != nil { + if s.inCatchupMode() { // If we're already in catchup mode, we need to look at the current catchupBlock, // not the checkpoint. since = s.catchupBlock @@ -238,7 +238,7 @@ func (s *subscription) restartFilter(ctx context.Context, checkpoint *big.Int) e } blockGap := new(big.Int).Sub(blockNumber.ToInt(), since).Int64() - log.Debugf("%s: restarting. Head=%s Position=%s Gap=%d (catchup threshold: %d)", s.logName, blockNumber.ToInt().String(), since.String(), blockGap, s.catchupModeBlockGap) + log.Debugf("%s: new filter. Head=%s Position=%s Gap=%d (catchup threshold: %d)", s.logName, blockNumber.ToInt().String(), since.String(), blockGap, s.catchupModeBlockGap) if s.catchupModeBlockGap > 0 && blockGap > s.catchupModeBlockGap { s.catchupBlock = since // note if we were already in catchup, this does not change anything return nil @@ -247,6 +247,10 @@ func (s *subscription) restartFilter(ctx context.Context, checkpoint *big.Int) e return s.createFilter(ctx, since) } +func (s *subscription) inCatchupMode() bool { + return s.catchupBlock != nil +} + // getEventTimestamp adds the block timestamp to the log entry. // It uses a lru cache (blocknumber, timestamp) in the eventstream to determine the timestamp // and falls back to querying the node if we don't have timestamp in the cache (at which point it gets