From 9acdf257fd7ffbd6946ba34a75a5c2ace0504189 Mon Sep 17 00:00:00 2001 From: Andrew Davis <1709934+Savid@users.noreply.github.com> Date: Tue, 21 Feb 2023 09:47:10 +1000 Subject: [PATCH] chore(lint): update golangci-lint and rules --- .github/workflows/golangci-lint.yaml | 2 +- .golangci.yml | 25 +++++++++++++++---- docs/discovery.md | 4 +-- docs/mimicry.md | 10 ++++---- example_discovery.yaml | 2 +- example_mimicry.yaml | 2 +- pkg/discovery/discovery.go | 2 ++ pkg/discovery/p2p/peer.go | 3 +-- pkg/discovery/p2p/raw_message.go | 1 + pkg/discovery/p2p/status.go | 1 + pkg/discovery/p2p/xatu/config.go | 2 +- pkg/discovery/p2p/xatu/xatu.go | 5 ++++ pkg/mimicry/coordinator/raw_message.go | 1 + pkg/mimicry/coordinator/static/static.go | 1 + .../coordinator/xatu/coordinator/config.go | 4 +-- pkg/mimicry/coordinator/xatu/peer/peer.go | 1 + pkg/mimicry/coordinator/xatu/xatu.go | 4 ++- .../p2p/execution/event_transaction.go | 5 +++- pkg/mimicry/p2p/execution/execution.go | 4 ++- pkg/output/raw_message.go | 1 + pkg/processor/batch_test.go | 3 +++ .../event/beacon/eth/v1/events_attestation.go | 5 ++-- .../event/beacon/eth/v1/events_block.go | 5 ++-- .../event/beacon/eth/v1/events_chain_reorg.go | 5 ++-- .../eth/v1/events_contribution_and_proof.go | 5 ++-- .../eth/v1/events_finalized_checkpoint.go | 5 ++-- pkg/sentry/event/beacon/eth/v1/events_head.go | 5 ++-- .../beacon/eth/v1/events_voluntary_exit.go | 5 ++-- .../event/beacon/eth/v2/beacon_block.go | 5 ++-- pkg/server/config.go | 4 +-- pkg/server/geoip/maxmind/maxmind.go | 2 ++ pkg/server/geoip/raw_message.go | 1 + pkg/server/metrics.go | 10 -------- pkg/server/persistence/node/activity.go | 8 +++--- pkg/server/persistence/node/execution.go | 14 +++++------ pkg/server/persistence/node/parse.go | 2 ++ pkg/server/persistence/node/record.go | 12 ++++----- pkg/server/server.go | 10 ++++---- pkg/server/service/coordinator/client.go | 2 +- .../event/beacon/eth/v2/beacon_block.go | 6 +++++ pkg/server/service/event-ingester/handler.go | 3 +++ pkg/server/service/raw_message.go | 1 + pkg/server/store/memory/memory.go | 2 ++ pkg/server/store/raw_message.go | 1 + pkg/server/store/redis/cluster/cluster.go | 4 +++ pkg/server/store/redis/server/server.go | 5 +++- 46 files changed, 135 insertions(+), 75 deletions(-) delete mode 100644 pkg/server/metrics.go diff --git a/.github/workflows/golangci-lint.yaml b/.github/workflows/golangci-lint.yaml index 26ffb2da..273c907f 100644 --- a/.github/workflows/golangci-lint.yaml +++ b/.github/workflows/golangci-lint.yaml @@ -23,7 +23,7 @@ jobs: uses: golangci/golangci-lint-action@v3 with: # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version - version: v1.50.1 + version: v1.51.2 # Optional: working directory, useful for monorepos # working-directory: somedir diff --git a/.golangci.yml b/.golangci.yml index a1de6705..1058a108 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -20,33 +20,48 @@ linters-settings: linters: disable-all: true enable: + - asasalint + - bidichk - bodyclose - - deadcode + - containedctx + - decorder - depguard - dogsled + - durationcheck - errcheck + - errname - exportloopref - goconst - gocritic + - gocyclo - gofmt + - goheader - goimports - - gocyclo - gosec - gosimple - govet + - ineffassign - misspell - - nolintlint - nakedret + - nilerr + - nilerr + - nilnil + - nlreturn + - nolintlint + - nosprintfhostport - prealloc + - predeclared + - promlinter + - reassign - revive - staticcheck - - structcheck - stylecheck + - tagliatelle - thelper - tparallel - typecheck - unconvert - - varcheck + - unused - whitespace - wsl diff --git a/docs/discovery.md b/docs/discovery.md index 38342e2a..860b009b 100644 --- a/docs/discovery.md +++ b/docs/discovery.md @@ -60,7 +60,7 @@ P2P configuration to get node records to discover from the [Xatu server coordina | p2p.config.discV5 | bool | `true` | enable Node Discovery Protocol v5 *Note: both Node Discovery Protocol v4 and v5 can be enabled at the same time* | | p2p.config.restart | string | `2m` | Time between initiating discovery scans and fetching new node record, will generate a fresh private key each time | | p2p.config.networkIDs | array | | List of network ids to filter node records by (decimal format, eg. '1' for mainnet) | -| p2p.config.forkIDHashes | array | | List of [Fork ID hash](https://eips.ethereum.org/EIPS/eip-2124) to filter node records by (hex string) | +| p2p.config.forkIdHashes | array | | List of [Fork ID hash](https://eips.ethereum.org/EIPS/eip-2124) to filter node records by (hex string) | ### P2P `static` configuration @@ -111,7 +111,7 @@ p2p: discV5: true restart: 2m networkIDs: [1] - forkIDHashes: [0xf0afd0e3] + forkIdHashes: [0xf0afd0e3] ``` ## Running locally diff --git a/docs/mimicry.md b/docs/mimicry.md index d541545f..dfaa4d0b 100644 --- a/docs/mimicry.md +++ b/docs/mimicry.md @@ -98,7 +98,7 @@ Output configuration to send mimicry events to a [Xatu server](./server.md). | outputs[].config.exportTimeout | string | `30s` | The maximum duration for exporting events. If the timeout is reached, the export will be cancelled | | outputs[].config.maxExportBatchSize | int | `512` | MaxExportBatchSize is the maximum number of events to process in a single batch. If there are more than one batch worth of events then it processes multiple batches of events one batch after the other without any delay | | outputs[].config.networkIDs | array | | List of network ids to connect to (decimal format, eg. '1' for mainnet) | -| outputs[].config.forkIDHashes | array | | List of [Fork ID hash](https://eips.ethereum.org/EIPS/eip-2124) to connect to (hex string) | +| outputs[].config.forkIdHashes | array | | List of [Fork ID hash](https://eips.ethereum.org/EIPS/eip-2124) to connect to (hex string) | | outputs[].config.maxPeers | int | `100` | Max number of peers to attempt to connect to simultaneously | ### Output `http` configuration @@ -124,7 +124,7 @@ coordinator: config: address: localhost:8080 networkIDs: [1] - forkIDHashes: [0xf0afd0e3] + forkIdHashes: [0xf0afd0e3] maxPeers: 100 outputs: @@ -159,7 +159,7 @@ coordinator: config: address: localhost:8080 networkIDs: [1] - forkIDHashes: [0xf0afd0e3] + forkIdHashes: [0xf0afd0e3] outputs: - name: xatu-output @@ -178,7 +178,7 @@ coordinator: config: address: localhost:8080 networkIDs: [1] - forkIDHashes: [0xf0afd0e3] + forkIdHashes: [0xf0afd0e3] outputs: - name: http-basic-auth @@ -207,7 +207,7 @@ coordinator: config: address: localhost:8080 networkIDs: [1] - forkIDHashes: [0xf0afd0e3] + forkIdHashes: [0xf0afd0e3] outputs: - name: log diff --git a/example_discovery.yaml b/example_discovery.yaml index 9d6e1848..323cbbc0 100644 --- a/example_discovery.yaml +++ b/example_discovery.yaml @@ -75,4 +75,4 @@ p2p: # discV5: true # restart: 2m # networkIDs: [1] - # forkIDHashes: [0xf0afd0e3] + # forkIdHashes: [0xf0afd0e3] diff --git a/example_mimicry.yaml b/example_mimicry.yaml index e253e5f6..4c083eef 100644 --- a/example_mimicry.yaml +++ b/example_mimicry.yaml @@ -28,7 +28,7 @@ coordinator: # headers: # authorization: Someb64Value # networkIDs: [1] - # forkIDHashes: [0xf0afd0e3] + # forkIdHashes: [0xf0afd0e3] # maxPeers: 100 outputs: diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index 8a82f5ef..9be83a44 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -161,6 +161,7 @@ func (d *Discovery) startCrons(ctx context.Context) error { nodeRecords, err := d.coordinator.ListStaleNodeRecords(ctx) if err != nil { d.log.WithError(err).Error("Failed to list stale node records") + return } d.log.WithField("records", len(nodeRecords)).Info("Adding stale node records to status") @@ -177,6 +178,7 @@ func (d *Discovery) startCrons(ctx context.Context) error { func (d *Discovery) handleExecutionStatus(ctx context.Context, status *xatu.ExecutionNodeStatus) error { d.metrics.AddNodeRecordStatus(1, fmt.Sprintf("%d", status.GetNetworkId()), fmt.Sprintf("0x%x", status.GetForkId().GetHash())) + return d.coordinator.HandleExecutionNodeRecordStatus(ctx, status) } diff --git a/pkg/discovery/p2p/peer.go b/pkg/discovery/p2p/peer.go index 2aa87021..c13e49f3 100644 --- a/pkg/discovery/p2p/peer.go +++ b/pkg/discovery/p2p/peer.go @@ -20,8 +20,6 @@ type Peer struct { hello *mimicry.Hello handlerFunc func(ctx context.Context, status *xatu.ExecutionNodeStatus) - - response chan error } func NewPeer(ctx context.Context, log logrus.FieldLogger, nodeRecord string, handlerFunc func(ctx context.Context, status *xatu.ExecutionNodeStatus)) (*Peer, error) { @@ -43,6 +41,7 @@ func (p *Peer) Start(ctx context.Context) (<-chan error, error) { p.client.OnHello(ctx, func(ctx context.Context, hello *mimicry.Hello) error { p.hello = hello + return nil }) diff --git a/pkg/discovery/p2p/raw_message.go b/pkg/discovery/p2p/raw_message.go index 1e6f1dc8..42af6f8a 100644 --- a/pkg/discovery/p2p/raw_message.go +++ b/pkg/discovery/p2p/raw_message.go @@ -6,6 +6,7 @@ type RawMessage struct { func (r *RawMessage) UnmarshalYAML(unmarshal func(interface{}) error) error { r.unmarshal = unmarshal + return nil } diff --git a/pkg/discovery/p2p/status.go b/pkg/discovery/p2p/status.go index fe6cfdb1..a9c5fcac 100644 --- a/pkg/discovery/p2p/status.go +++ b/pkg/discovery/p2p/status.go @@ -111,6 +111,7 @@ func (s *Status) AddExecutionNodeRecords(ctx context.Context, nodeRecords []stri retry.Attempts(5), retry.DelayType(func(n uint, err error, config *retry.Config) time.Duration { s.log.WithError(err).Debug("peer failed") + return 5 * time.Second }), ) diff --git a/pkg/discovery/p2p/xatu/config.go b/pkg/discovery/p2p/xatu/config.go index ab437072..895c8782 100644 --- a/pkg/discovery/p2p/xatu/config.go +++ b/pkg/discovery/p2p/xatu/config.go @@ -12,7 +12,7 @@ type Config struct { DiscV5 bool `yaml:"discV5" default:"true"` Restart time.Duration `yaml:"restart" default:"2m"` NetworkIDs []uint64 `yaml:"networkIDs"` - ForkIDHashes []string `yaml:"forkIDHashes"` + ForkIDHashes []string `yaml:"forkIdHashes"` } func (c *Config) Validate() error { diff --git a/pkg/discovery/p2p/xatu/xatu.go b/pkg/discovery/p2p/xatu/xatu.go index e4a9f811..b2e9f5ba 100644 --- a/pkg/discovery/p2p/xatu/xatu.go +++ b/pkg/discovery/p2p/xatu/xatu.go @@ -142,26 +142,31 @@ func (c *Coordinator) startCrons(ctx context.Context) error { if err != nil { c.log.WithError(err).Error("Failed to get a discovery node record") + return } if err = c.discV4.UpdateBootNodes([]string{res.NodeRecord}); err != nil { c.log.WithError(err).Error("Failed to update discV4 boot nodes") + return } if errS := c.discV4.Start(ctx); errS != nil { c.log.WithError(errS).Error("Failed to start discV4") + return } if err = c.discV5.UpdateBootNodes([]string{res.NodeRecord}); err != nil { c.log.WithError(err).Error("Failed to update discV5 boot nodes") + return } if err := c.discV5.Start(ctx); err != nil { c.log.WithError(err).Error("Failed to start discV5") + return } }); err != nil { diff --git a/pkg/mimicry/coordinator/raw_message.go b/pkg/mimicry/coordinator/raw_message.go index 2f26c47d..87b538bb 100644 --- a/pkg/mimicry/coordinator/raw_message.go +++ b/pkg/mimicry/coordinator/raw_message.go @@ -6,6 +6,7 @@ type RawMessage struct { func (r *RawMessage) UnmarshalYAML(unmarshal func(interface{}) error) error { r.unmarshal = unmarshal + return nil } diff --git a/pkg/mimicry/coordinator/static/static.go b/pkg/mimicry/coordinator/static/static.go index a5b22e0e..2113ccb2 100644 --- a/pkg/mimicry/coordinator/static/static.go +++ b/pkg/mimicry/coordinator/static/static.go @@ -85,6 +85,7 @@ func (s *Static) Start(ctx context.Context) error { retry.Attempts(0), retry.DelayType(func(n uint, err error, config *retry.Config) time.Duration { s.log.WithError(err).Debug("peer failed") + return s.config.RetryInterval }), ) diff --git a/pkg/mimicry/coordinator/xatu/coordinator/config.go b/pkg/mimicry/coordinator/xatu/coordinator/config.go index 994f01fb..c3e559e1 100644 --- a/pkg/mimicry/coordinator/xatu/coordinator/config.go +++ b/pkg/mimicry/coordinator/xatu/coordinator/config.go @@ -8,8 +8,8 @@ type Config struct { Address string `yaml:"address"` Headers map[string]string `yaml:"headers"` TLS bool `yaml:"tls" default:"false"` - NetworkIDs []uint64 `yaml:"networkIDs"` - ForkIDHashes []string `yaml:"forkIDHashes"` + NetworkIDs []uint64 `yaml:"networkIds"` + ForkIDHashes []string `yaml:"forkIdHashes"` MaxPeers uint32 `yaml:"maxPeers" default:"100"` } diff --git a/pkg/mimicry/coordinator/xatu/peer/peer.go b/pkg/mimicry/coordinator/xatu/peer/peer.go index bedf7fc1..6be74903 100644 --- a/pkg/mimicry/coordinator/xatu/peer/peer.go +++ b/pkg/mimicry/coordinator/xatu/peer/peer.go @@ -93,6 +93,7 @@ func (p *Peer) Start(ctx context.Context) error { func (p *Peer) Stop() error { p.stopped = true + return nil } diff --git a/pkg/mimicry/coordinator/xatu/xatu.go b/pkg/mimicry/coordinator/xatu/xatu.go index 3621367d..f718269b 100644 --- a/pkg/mimicry/coordinator/xatu/xatu.go +++ b/pkg/mimicry/coordinator/xatu/xatu.go @@ -18,7 +18,6 @@ const Type = "xatu" type Xatu struct { handlers *handler.Peer - name string log logrus.FieldLogger cache *cache.SharedCache @@ -106,11 +105,13 @@ func (x *Xatu) startCrons(ctx context.Context) error { res, err := x.coordinator.CoordinateExecutionNodeRecords(ctx, records) if err != nil { x.log.WithError(err).Error("failed to coordinate execution node records") + return } if res == nil { x.log.Error("failed to coordinate execution node records: nil response") + return } @@ -122,6 +123,7 @@ func (x *Xatu) startCrons(ctx context.Context) error { if record == i { found = true peer.RetryDelay(retryDelay) + break } } diff --git a/pkg/mimicry/p2p/execution/event_transaction.go b/pkg/mimicry/p2p/execution/event_transaction.go index b9e6cd0c..026bba08 100644 --- a/pkg/mimicry/p2p/execution/event_transaction.go +++ b/pkg/mimicry/p2p/execution/event_transaction.go @@ -2,6 +2,7 @@ package execution import ( "context" + "errors" "fmt" "strconv" "time" @@ -31,7 +32,7 @@ func (p *Peer) handleTransaction(ctx context.Context, eventTime time.Time, event "transaction_hash": event.Hash().String(), }).Debug("Duplicate transaction event received") // TODO(savid): add metrics - return nil, nil + return nil, errors.New("duplicate transaction event received") } meta, err := p.createNewClientMeta(ctx) @@ -78,6 +79,7 @@ func (p *Peer) getTransactionData(ctx context.Context, event *types.Transaction, from, err := p.signer.Sender(event) if err != nil { p.log.WithError(err).Error("failed to get sender") + return nil, err } @@ -138,6 +140,7 @@ func (p *Peer) ExportTransactions(ctx context.Context, items []*TransactionHashI txs, err := p.client.GetPooledTransactions(ctx, hashes) if err != nil { p.log.WithError(err).Error("Failed to get pooled transactions") + return } diff --git a/pkg/mimicry/p2p/execution/execution.go b/pkg/mimicry/p2p/execution/execution.go index 1c41195e..d7d2d94c 100644 --- a/pkg/mimicry/p2p/execution/execution.go +++ b/pkg/mimicry/p2p/execution/execution.go @@ -76,7 +76,7 @@ func New(ctx context.Context, log logrus.FieldLogger, nodeRecord string, handler func (p *Peer) createNewClientMeta(ctx context.Context) (*xatu.ClientMeta, error) { if p.handlers.CreateNewClientMeta == nil { - return nil, nil + return nil, errors.New("no CreateNewClientMeta handler") } meta, err := p.handlers.CreateNewClientMeta(ctx) @@ -244,6 +244,7 @@ func (p *Peer) Start(ctx context.Context) (<-chan error, error) { } } } + return nil }) @@ -267,6 +268,7 @@ func (p *Peer) Start(ctx context.Context) (<-chan error, error) { err = p.client.Start(ctx) if err != nil { p.log.WithError(err).Debug("failed to dial client") + return nil, err } diff --git a/pkg/output/raw_message.go b/pkg/output/raw_message.go index b6386cb1..aab9a326 100644 --- a/pkg/output/raw_message.go +++ b/pkg/output/raw_message.go @@ -6,6 +6,7 @@ type RawMessage struct { func (r *RawMessage) UnmarshalYAML(unmarshal func(interface{}) error) error { r.unmarshal = unmarshal + return nil } diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index 09ed6a44..6c75ca7d 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -51,6 +51,7 @@ func (t *testBatchExporter[T]) ExportItems(ctx context.Context, items []*T) erro select { case <-ctx.Done(): t.err = ctx.Err() + return ctx.Err() default: } @@ -64,6 +65,7 @@ func (t *testBatchExporter[T]) ExportItems(ctx context.Context, items []*T) erro func (t *testBatchExporter[T]) Shutdown(context.Context) error { t.shutdownCount++ + return nil } @@ -379,6 +381,7 @@ type indefiniteExporter[T TestItem] struct{} func (indefiniteExporter[T]) Shutdown(context.Context) error { return nil } func (indefiniteExporter[T]) ExportItems(ctx context.Context, _ []*T) error { <-ctx.Done() + return ctx.Err() } diff --git a/pkg/sentry/event/beacon/eth/v1/events_attestation.go b/pkg/sentry/event/beacon/eth/v1/events_attestation.go index 5ef5fcfa..a96f25e6 100644 --- a/pkg/sentry/event/beacon/eth/v1/events_attestation.go +++ b/pkg/sentry/event/beacon/eth/v1/events_attestation.go @@ -2,6 +2,7 @@ package event import ( "context" + "errors" "fmt" "time" @@ -44,7 +45,7 @@ func (e *EventsAttestation) Decorate(ctx context.Context) (*xatu.DecoratedEvent, } if ignore { - return nil, nil + return nil, errors.New("duplicate event") } decoratedEvent := &xatu.DecoratedEvent{ @@ -90,7 +91,7 @@ func (e *EventsAttestation) Decorate(ctx context.Context) (*xatu.DecoratedEvent, func (e *EventsAttestation) shouldIgnore(ctx context.Context) (bool, error) { if err := e.beacon.Synced(ctx); err != nil { - return true, nil + return true, err } hash, err := hashstructure.Hash(e.event, hashstructure.FormatV2, nil) diff --git a/pkg/sentry/event/beacon/eth/v1/events_block.go b/pkg/sentry/event/beacon/eth/v1/events_block.go index de8a8d8e..912e1805 100644 --- a/pkg/sentry/event/beacon/eth/v1/events_block.go +++ b/pkg/sentry/event/beacon/eth/v1/events_block.go @@ -2,6 +2,7 @@ package event import ( "context" + "errors" "fmt" "time" @@ -44,7 +45,7 @@ func (e *EventsBlock) Decorate(ctx context.Context) (*xatu.DecoratedEvent, error } if ignore { - return nil, nil + return nil, errors.New("duplicate event") } decoratedEvent := &xatu.DecoratedEvent{ @@ -78,7 +79,7 @@ func (e *EventsBlock) Decorate(ctx context.Context) (*xatu.DecoratedEvent, error func (e *EventsBlock) shouldIgnore(ctx context.Context) (bool, error) { if err := e.beacon.Synced(ctx); err != nil { - return true, nil + return true, err } hash, err := hashstructure.Hash(e.event, hashstructure.FormatV2, nil) diff --git a/pkg/sentry/event/beacon/eth/v1/events_chain_reorg.go b/pkg/sentry/event/beacon/eth/v1/events_chain_reorg.go index 9eb89d0c..b680092b 100644 --- a/pkg/sentry/event/beacon/eth/v1/events_chain_reorg.go +++ b/pkg/sentry/event/beacon/eth/v1/events_chain_reorg.go @@ -2,6 +2,7 @@ package event import ( "context" + "errors" "fmt" "time" @@ -44,7 +45,7 @@ func (e *EventsChainReorg) Decorate(ctx context.Context) (*xatu.DecoratedEvent, } if ignore { - return nil, nil + return nil, errors.New("duplicate event") } decoratedEvent := &xatu.DecoratedEvent{ @@ -82,7 +83,7 @@ func (e *EventsChainReorg) Decorate(ctx context.Context) (*xatu.DecoratedEvent, func (e *EventsChainReorg) shouldIgnore(ctx context.Context) (bool, error) { if err := e.beacon.Synced(ctx); err != nil { - return true, nil + return true, err } hash, err := hashstructure.Hash(e.event, hashstructure.FormatV2, nil) diff --git a/pkg/sentry/event/beacon/eth/v1/events_contribution_and_proof.go b/pkg/sentry/event/beacon/eth/v1/events_contribution_and_proof.go index 921e6bed..c179dbd8 100644 --- a/pkg/sentry/event/beacon/eth/v1/events_contribution_and_proof.go +++ b/pkg/sentry/event/beacon/eth/v1/events_contribution_and_proof.go @@ -2,6 +2,7 @@ package event import ( "context" + "errors" "fmt" "time" @@ -44,7 +45,7 @@ func (e *EventsContributionAndProof) Decorate(ctx context.Context) (*xatu.Decora } if ignore { - return nil, nil + return nil, errors.New("duplicate event") } decoratedEvent := &xatu.DecoratedEvent{ @@ -87,7 +88,7 @@ func (e *EventsContributionAndProof) Decorate(ctx context.Context) (*xatu.Decora func (e *EventsContributionAndProof) shouldIgnore(ctx context.Context) (bool, error) { if err := e.beacon.Synced(ctx); err != nil { - return true, nil + return true, err } hash, err := hashstructure.Hash(e.event, hashstructure.FormatV2, nil) diff --git a/pkg/sentry/event/beacon/eth/v1/events_finalized_checkpoint.go b/pkg/sentry/event/beacon/eth/v1/events_finalized_checkpoint.go index 1e8f5a77..0aef0ace 100644 --- a/pkg/sentry/event/beacon/eth/v1/events_finalized_checkpoint.go +++ b/pkg/sentry/event/beacon/eth/v1/events_finalized_checkpoint.go @@ -2,6 +2,7 @@ package event import ( "context" + "errors" "fmt" "time" @@ -44,7 +45,7 @@ func (e *EventsFinalizedCheckpoint) Decorate(ctx context.Context) (*xatu.Decorat } if ignore { - return nil, nil + return nil, errors.New("duplicate event") } decoratedEvent := &xatu.DecoratedEvent{ @@ -78,7 +79,7 @@ func (e *EventsFinalizedCheckpoint) Decorate(ctx context.Context) (*xatu.Decorat func (e *EventsFinalizedCheckpoint) shouldIgnore(ctx context.Context) (bool, error) { if err := e.beacon.Synced(ctx); err != nil { - return true, nil + return true, err } hash, err := hashstructure.Hash(e.event, hashstructure.FormatV2, nil) diff --git a/pkg/sentry/event/beacon/eth/v1/events_head.go b/pkg/sentry/event/beacon/eth/v1/events_head.go index 942f21fd..1bf11474 100644 --- a/pkg/sentry/event/beacon/eth/v1/events_head.go +++ b/pkg/sentry/event/beacon/eth/v1/events_head.go @@ -2,6 +2,7 @@ package event import ( "context" + "errors" "fmt" "time" @@ -44,7 +45,7 @@ func (e *EventsHead) Decorate(ctx context.Context) (*xatu.DecoratedEvent, error) } if ignore { - return nil, nil + return nil, errors.New("duplicate event") } decoratedEvent := &xatu.DecoratedEvent{ @@ -81,7 +82,7 @@ func (e *EventsHead) Decorate(ctx context.Context) (*xatu.DecoratedEvent, error) func (e *EventsHead) shouldIgnore(ctx context.Context) (bool, error) { if err := e.beacon.Synced(ctx); err != nil { - return true, nil + return true, err } hash, err := hashstructure.Hash(e.event, hashstructure.FormatV2, nil) diff --git a/pkg/sentry/event/beacon/eth/v1/events_voluntary_exit.go b/pkg/sentry/event/beacon/eth/v1/events_voluntary_exit.go index 5cf3fc87..773e7511 100644 --- a/pkg/sentry/event/beacon/eth/v1/events_voluntary_exit.go +++ b/pkg/sentry/event/beacon/eth/v1/events_voluntary_exit.go @@ -2,6 +2,7 @@ package event import ( "context" + "errors" "fmt" "time" @@ -44,7 +45,7 @@ func (e *EventsVoluntaryExit) Decorate(ctx context.Context) (*xatu.DecoratedEven } if ignore { - return nil, nil + return nil, err } decoratedEvent := &xatu.DecoratedEvent{ @@ -77,7 +78,7 @@ func (e *EventsVoluntaryExit) Decorate(ctx context.Context) (*xatu.DecoratedEven func (e *EventsVoluntaryExit) shouldIgnore(ctx context.Context) (bool, error) { if err := e.beacon.Synced(ctx); err != nil { - return true, nil + return true, errors.New("duplicate event") } hash, err := hashstructure.Hash(e.event, hashstructure.FormatV2, nil) diff --git a/pkg/sentry/event/beacon/eth/v2/beacon_block.go b/pkg/sentry/event/beacon/eth/v2/beacon_block.go index 67eff133..7207a042 100644 --- a/pkg/sentry/event/beacon/eth/v2/beacon_block.go +++ b/pkg/sentry/event/beacon/eth/v2/beacon_block.go @@ -2,6 +2,7 @@ package event import ( "context" + "errors" "fmt" "time" @@ -48,7 +49,7 @@ func (e *BeaconBlock) Decorate(ctx context.Context) (*xatu.DecoratedEvent, error } if ignore { - return nil, nil + return nil, errors.New("duplicate event") } var data *xatuethv2.EventBlock @@ -97,7 +98,7 @@ func (e *BeaconBlock) shouldIgnore(ctx context.Context) (bool, error) { } if err := e.beacon.Synced(ctx); err != nil { - return true, nil + return true, err } hash, err := hashstructure.Hash(e.event, hashstructure.FormatV2, nil) diff --git a/pkg/server/config.go b/pkg/server/config.go index 236cfdfa..f6b77635 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -11,9 +11,9 @@ type Config struct { // The address to listen on. Addr string `yaml:"addr" default:":8080"` // MetricsAddr is the address to listen on for metrics. - MetricsAddr string `yaml:"metrics_addr" default:":9090"` + MetricsAddr string `yaml:"metricsAddr" default:":9090"` // LoggingLevel is the logging level to use. - LoggingLevel string `yaml:"logging_level" default:"info"` + LoggingLevel string `yaml:"logging" default:"info"` // NTP Server to use for clock drift correction NTPServer string `yaml:"ntpServer" default:"time.google.com"` diff --git a/pkg/server/geoip/maxmind/maxmind.go b/pkg/server/geoip/maxmind/maxmind.go index 6dedab4b..7df75e4d 100644 --- a/pkg/server/geoip/maxmind/maxmind.go +++ b/pkg/server/geoip/maxmind/maxmind.go @@ -77,6 +77,7 @@ func (m *Maxmind) LookupIP(ctx context.Context, ip net.IP) (*lookup.Result, erro city, err := m.city.Lookup(ip) if err != nil { m.metrics.AddLookupIP(1, m.Type(), "error") + return nil, err } @@ -94,6 +95,7 @@ func (m *Maxmind) LookupIP(ctx context.Context, ip net.IP) (*lookup.Result, erro asn, err := m.asn.Lookup(ip) if err != nil { m.metrics.AddLookupIP(1, m.Type(), "error") + return nil, err } diff --git a/pkg/server/geoip/raw_message.go b/pkg/server/geoip/raw_message.go index a43709e0..90289bd0 100644 --- a/pkg/server/geoip/raw_message.go +++ b/pkg/server/geoip/raw_message.go @@ -6,6 +6,7 @@ type RawMessage struct { func (r *RawMessage) UnmarshalYAML(unmarshal func(interface{}) error) error { r.unmarshal = unmarshal + return nil } diff --git a/pkg/server/metrics.go b/pkg/server/metrics.go deleted file mode 100644 index d2d8068d..00000000 --- a/pkg/server/metrics.go +++ /dev/null @@ -1,10 +0,0 @@ -package server - -type Metrics struct { -} - -func NewMetrics(namespace string) *Metrics { - m := &Metrics{} - - return m -} diff --git a/pkg/server/persistence/node/activity.go b/pkg/server/persistence/node/activity.go index a76ee2df..e4795a30 100644 --- a/pkg/server/persistence/node/activity.go +++ b/pkg/server/persistence/node/activity.go @@ -4,15 +4,15 @@ import "time" type Activity struct { // ActivityID is the activity id. - ActivityID interface{} `json:"activity_id" db:"activity_id"` + ActivityID interface{} `json:"activityId" db:"activity_id"` // Enr is the enr of the node record. Enr string `json:"enr" db:"enr" fieldopt:"omitempty"` // Client is the name of the coordinated client. - ClientID string `json:"client_id" db:"client_id" fieldopt:"omitempty"` + ClientID string `json:"clientId" db:"client_id" fieldopt:"omitempty"` // CreateTime is the timestamp of when the activity record was created. - CreateTime time.Time `json:"create_time" db:"create_time" fieldopt:"omitempty"` + CreateTime time.Time `json:"createTime" db:"create_time" fieldopt:"omitempty"` // UpdateTime is the timestamp of when the activity record was updated. - UpdateTime time.Time `json:"update_time" db:"update_time" fieldopt:"omitempty"` + UpdateTime time.Time `json:"updateTime" db:"update_time" fieldopt:"omitempty"` // Connected is the connected status of the node. Connected bool `json:"connected" db:"connected" fieldopt:"omitempty"` } diff --git a/pkg/server/persistence/node/execution.go b/pkg/server/persistence/node/execution.go index bdfa70fb..8dc997a9 100644 --- a/pkg/server/persistence/node/execution.go +++ b/pkg/server/persistence/node/execution.go @@ -4,27 +4,27 @@ import "time" type Execution struct { // ExecutionID is the execution id. - ExecutionID int64 `json:"execution_id" db:"execution_id"` + ExecutionID int64 `json:"executionId" db:"execution_id"` // Enr is the enr of the node record. Enr string `json:"enr" db:"enr" fieldopt:"omitempty"` // CreateTime is the timestamp of when the execution record was created. - CreateTime time.Time `json:"create_time" db:"create_time" fieldopt:"omitempty"` + CreateTime time.Time `json:"createTime" db:"create_time" fieldopt:"omitempty"` // Name is the name of the node. Name string `json:"name" db:"name" fieldopt:"omitempty"` // Capabilities is the capabilities of the node. Capabilities string `json:"capabilities" db:"capabilities" fieldopt:"omitempty"` // ProtocolVersion is the protocol version of the node. - ProtocolVersion string `json:"protocol_version" db:"protocol_version" fieldopt:"omitempty"` + ProtocolVersion string `json:"protocolVersion" db:"protocol_version" fieldopt:"omitempty"` // NetworkId is the network id of the node. - NetworkID string `json:"network_id" db:"network_id" fieldopt:"omitempty"` + NetworkID string `json:"networkId" db:"network_id" fieldopt:"omitempty"` // TD is the total difficulty of the node. - TotalDifficulty string `json:"total_difficulty" db:"total_difficulty" fieldopt:"omitempty"` + TotalDifficulty string `json:"totalDifficulty" db:"total_difficulty" fieldopt:"omitempty"` // Head is the head of the node. Head []byte `json:"head" db:"head" fieldopt:"omitempty"` // Genesis is the genesis of the node. Genesis []byte `json:"genesis" db:"genesis" fieldopt:"omitempty"` // ForkIdHash is the fork id hash of the node. - ForkIDHash []byte `json:"fork_id_hash" db:"fork_id_hash" fieldopt:"omitempty"` + ForkIDHash []byte `json:"forkIdHash" db:"fork_id_hash" fieldopt:"omitempty"` // ForkIdNext is the fork id next of the node. - ForkIDNext string `json:"fork_id_next" db:"fork_id_next" fieldopt:"omitempty"` + ForkIDNext string `json:"forkIdNext" db:"fork_id_next" fieldopt:"omitempty"` } diff --git a/pkg/server/persistence/node/parse.go b/pkg/server/persistence/node/parse.go index 7c7fab5b..afb3ceaa 100644 --- a/pkg/server/persistence/node/parse.go +++ b/pkg/server/persistence/node/parse.go @@ -55,11 +55,13 @@ func Parse(record string) (*Record, error) { func parseSignature(node *enode.Node) *[]byte { signature := node.Record().Signature() + return &signature } func parseSeq(node *enode.Node) *uint64 { seq := node.Seq() + return &seq } diff --git a/pkg/server/persistence/node/record.go b/pkg/server/persistence/node/record.go index 09bd915d..6f7164a8 100644 --- a/pkg/server/persistence/node/record.go +++ b/pkg/server/persistence/node/record.go @@ -13,13 +13,13 @@ type Record struct { // Seq is the sequence number, a 64-bit unsigned integer. Nodes should increase the number whenever the record changes and republish the record Seq *uint64 `json:"seq" db:"seq" fieldopt:"omitempty"` // CreateTime is the timestamp of when the node record was created. - CreateTime time.Time `json:"create_time" db:"create_time" fieldopt:"omitempty"` + CreateTime time.Time `json:"createTime" db:"create_time" fieldopt:"omitempty"` // LastDialTime is the timestamp of when the node record was last dialed. - LastDialTime sql.NullTime `json:"last_dial_time" db:"last_dial_time" fieldopt:"omitempty"` + LastDialTime sql.NullTime `json:"lastDialTime" db:"last_dial_time" fieldopt:"omitempty"` // ConsecutiveDialAttempts is the number of consecutive dial attempts. - ConsecutiveDialAttempts int `json:"consecutive_dial_attempts" db:"consecutive_dial_attempts"` + ConsecutiveDialAttempts int `json:"consecutiveDialAttempts" db:"consecutive_dial_attempts"` // LastConnectTime is the timestamp of when the node record was last connected. - LastConnectTime sql.NullTime `json:"last_connect_time" db:"last_connect_time" fieldopt:"omitempty"` + LastConnectTime sql.NullTime `json:"lastConnectTime" db:"last_connect_time" fieldopt:"omitempty"` // ID is the name of identity scheme, e.g. “v4” ID *string `json:"id" db:"id" fieldopt:"omitempty"` // Secp256k1 is the secp256k1 public key of the node record. @@ -43,7 +43,7 @@ type Record struct { // Syncnets is the sync subnet bitfield of the node record. Syncnets *[]byte `json:"syncnets" db:"syncnets" fieldopt:"omitempty"` // NodeID is the node ID of the node record. - NodeID *string `json:"node_id" db:"node_id" fieldopt:"omitempty"` + NodeID *string `json:"nodeId" db:"node_id" fieldopt:"omitempty"` // PeerID is the peer ID of the node record. - PeerID *string `json:"peer_id" db:"peer_id" fieldopt:"omitempty"` + PeerID *string `json:"peerId" db:"peer_id" fieldopt:"omitempty"` } diff --git a/pkg/server/server.go b/pkg/server/server.go index 964996aa..963d8a80 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -26,10 +26,8 @@ import ( ) type Xatu struct { - ctx context.Context - log logrus.FieldLogger - config *Config - metrics *Metrics + log logrus.FieldLogger + config *Config services []service.GRPCService @@ -82,7 +80,6 @@ func NewXatu(ctx context.Context, log logrus.FieldLogger, conf *Config) (*Xatu, return &Xatu{ config: conf, log: log.WithField("component", "server"), - ctx: ctx, persistence: p, cache: c, geoipProvider: g, @@ -123,12 +120,14 @@ func (x *Xatu) Start(ctx context.Context) error { return err } } + return nil }) g.Go(func() error { if err := x.startGrpcServer(ctx); err != nil { return err } + return nil }) g.Go(func() error { @@ -136,6 +135,7 @@ func (x *Xatu) Start(ctx context.Context) error { if err := x.stop(ctx); err != nil { return err } + return nil }) diff --git a/pkg/server/service/coordinator/client.go b/pkg/server/service/coordinator/client.go index f946163a..d92c4579 100644 --- a/pkg/server/service/coordinator/client.go +++ b/pkg/server/service/coordinator/client.go @@ -105,7 +105,7 @@ func (c *Client) ListStalledExecutionNodeRecords(ctx context.Context, req *xatu. pageSize = 1000 } - nodeRecords, err := c.persistence.CheckoutStalledExecutionNodeRecords(ctx, int(req.PageSize)) + nodeRecords, err := c.persistence.CheckoutStalledExecutionNodeRecords(ctx, pageSize) if err != nil { return nil, err } diff --git a/pkg/server/service/event-ingester/event/beacon/eth/v2/beacon_block.go b/pkg/server/service/event-ingester/event/beacon/eth/v2/beacon_block.go index 86e1cf13..5ad247f5 100644 --- a/pkg/server/service/event-ingester/event/beacon/eth/v2/beacon_block.go +++ b/pkg/server/service/event-ingester/event/beacon/eth/v2/beacon_block.go @@ -48,18 +48,21 @@ func (b *BeaconBlock) Filter(ctx context.Context) bool { data, ok := b.event.Data.(*xatu.DecoratedEvent_EthV2BeaconBlock) if !ok { b.log.Error("failed to cast event data") + return true } additionalData, ok := b.event.Meta.Client.AdditionalData.(*xatu.ClientMeta_EthV2BeaconBlock) if !ok { b.log.Error("failed to cast client additional data") + return true } version := additionalData.EthV2BeaconBlock.GetVersion() if version == "" { b.log.Error("failed to get version") + return true } @@ -76,11 +79,13 @@ func (b *BeaconBlock) Filter(ctx context.Context) bool { hash = data.EthV2BeaconBlock.Message.(*v2.EventBlock_CapellaBlock).CapellaBlock.StateRoot default: b.log.Error(fmt.Errorf("unknown version: %s", version)) + return true } if hash == "" { b.log.Error("failed to get hash") + return true } @@ -89,6 +94,7 @@ func (b *BeaconBlock) Filter(ctx context.Context) bool { _, retrieved, err := b.cache.GetOrSet(ctx, key, version, time.Minute*30) if err != nil { b.log.WithError(err).Error("failed to retrieve from cache") + return true } diff --git a/pkg/server/service/event-ingester/handler.go b/pkg/server/service/event-ingester/handler.go index 419629ce..2954c4a3 100644 --- a/pkg/server/service/event-ingester/handler.go +++ b/pkg/server/service/event-ingester/handler.go @@ -134,16 +134,19 @@ func (h *Handler) Events(ctx context.Context, clientID string, events []*xatu.De e, err := eventHandler.New(eventHandler.Type(eventName), h.log, event, h.cache) if err != nil { h.log.WithError(err).WithField("event", eventName).Warn("failed to create event handler") + continue } if err := e.Validate(ctx); err != nil { h.log.WithError(err).WithField("event", eventName).Warn("failed to validate event") + continue } if shouldFilter := e.Filter(ctx); shouldFilter { h.log.WithField("event", eventName).Debug("event filtered") + continue } diff --git a/pkg/server/service/raw_message.go b/pkg/server/service/raw_message.go index 537df0ba..6c0db254 100644 --- a/pkg/server/service/raw_message.go +++ b/pkg/server/service/raw_message.go @@ -6,6 +6,7 @@ type RawMessage struct { func (r *RawMessage) UnmarshalYAML(unmarshal func(interface{}) error) error { r.unmarshal = unmarshal + return nil } diff --git a/pkg/server/store/memory/memory.go b/pkg/server/store/memory/memory.go index 382b5beb..9387eece 100644 --- a/pkg/server/store/memory/memory.go +++ b/pkg/server/store/memory/memory.go @@ -46,6 +46,7 @@ func (m *Memory) Get(ctx context.Context, key string) (*string, error) { if item == nil { m.metrics.AddGet(1, m.Type(), "miss") + //nolint:nilnil // expected behavior return nil, nil } @@ -77,6 +78,7 @@ func (m *Memory) GetAndDelete(ctx context.Context, key string) (deletedValue *st if !exists { m.metrics.AddGet(1, m.Type(), "miss") + return } diff --git a/pkg/server/store/raw_message.go b/pkg/server/store/raw_message.go index 9749dfb3..c680774d 100644 --- a/pkg/server/store/raw_message.go +++ b/pkg/server/store/raw_message.go @@ -6,6 +6,7 @@ type RawMessage struct { func (r *RawMessage) UnmarshalYAML(unmarshal func(interface{}) error) error { r.unmarshal = unmarshal + return nil } diff --git a/pkg/server/store/redis/cluster/cluster.go b/pkg/server/store/redis/cluster/cluster.go index 5feb46f4..fe1b993c 100644 --- a/pkg/server/store/redis/cluster/cluster.go +++ b/pkg/server/store/redis/cluster/cluster.go @@ -67,6 +67,7 @@ func (c *Cluster) Get(ctx context.Context, key string) (*string, error) { if cmd.Err() != nil { if errors.Is(cmd.Err(), redis.Nil) { c.metrics.AddGet(1, c.Type(), "miss") + //nolint:nilnil // expected behavior return nil, nil } @@ -92,6 +93,7 @@ func (c *Cluster) GetOrSet(ctx context.Context, key, value string, ttl time.Dura if storedValue != nil { retrieved = true + return } @@ -129,6 +131,7 @@ func (c *Cluster) Set(ctx context.Context, key, value string, ttl time.Duration) if cmd.Err() != nil { c.metrics.AddSet(1, c.Type(), "error") + return cmd.Err() } @@ -143,6 +146,7 @@ func (c *Cluster) Delete(ctx context.Context, key string) error { if cmd.Err() != nil { c.metrics.AddDelete(1, c.Type(), "error") + return cmd.Err() } diff --git a/pkg/server/store/redis/server/server.go b/pkg/server/store/redis/server/server.go index a68873ac..31c000fe 100644 --- a/pkg/server/store/redis/server/server.go +++ b/pkg/server/store/redis/server/server.go @@ -67,7 +67,7 @@ func (s *Server) Get(ctx context.Context, key string) (*string, error) { if cmd.Err() != nil { if errors.Is(cmd.Err(), redis.Nil) { s.metrics.AddGet(1, s.Type(), "miss") - + //nolint:nilnil // expected behavior return nil, nil } @@ -93,6 +93,7 @@ func (s *Server) GetOrSet(ctx context.Context, key, value string, ttl time.Durat if storedValue != nil { retrieved = true + return } @@ -130,6 +131,7 @@ func (s *Server) Set(ctx context.Context, key, value string, ttl time.Duration) if cmd.Err() != nil { s.metrics.AddSet(1, s.Type(), "error") + return cmd.Err() } @@ -144,6 +146,7 @@ func (s *Server) Delete(ctx context.Context, key string) error { if cmd.Err() != nil { s.metrics.AddDelete(1, s.Type(), "error") + return cmd.Err() }