diff --git a/pkg/server/persistence/cannon/location.go b/pkg/server/persistence/cannon/location.go index e710a86c..c55439b4 100644 --- a/pkg/server/persistence/cannon/location.go +++ b/pkg/server/persistence/cannon/location.go @@ -1,6 +1,7 @@ package cannon import ( + "errors" "fmt" "time" @@ -23,6 +24,11 @@ type Location struct { Value string `json:"value" db:"value"` } +var ( + ErrFailedToMarshal = errors.New("failed to marshal location") + ErrFailedToUnmarshal = errors.New("failed to unmarshal location") +) + // MarshalValueFromProto marshals a proto message into the Value field. func (l *Location) Marshal(msg *xatu.CannonLocation) error { l.NetworkID = msg.NetworkId @@ -35,7 +41,7 @@ func (l *Location) Marshal(msg *xatu.CannonLocation) error { b, err := protojson.Marshal(data) if err != nil { - return err + return fmt.Errorf("%w: %s", ErrFailedToMarshal, err) } l.Value = string(b) @@ -46,7 +52,7 @@ func (l *Location) Marshal(msg *xatu.CannonLocation) error { b, err := protojson.Marshal(data) if err != nil { - return err + return fmt.Errorf("%w: %s", ErrFailedToMarshal, err) } l.Value = string(b) @@ -57,7 +63,7 @@ func (l *Location) Marshal(msg *xatu.CannonLocation) error { b, err := protojson.Marshal(data) if err != nil { - return err + return fmt.Errorf("%w: %s", ErrFailedToMarshal, err) } l.Value = string(b) @@ -68,7 +74,7 @@ func (l *Location) Marshal(msg *xatu.CannonLocation) error { b, err := protojson.Marshal(data) if err != nil { - return err + return fmt.Errorf("%w: %s", ErrFailedToMarshal, err) } l.Value = string(b) @@ -80,7 +86,7 @@ func (l *Location) Marshal(msg *xatu.CannonLocation) error { b, err := protojson.Marshal(data) if err != nil { - return err + return fmt.Errorf("%w: %s", ErrFailedToMarshal, err) } l.Value = string(b) @@ -92,7 +98,7 @@ func (l *Location) Marshal(msg *xatu.CannonLocation) error { b, err := protojson.Marshal(data) if err != nil { - return err + return fmt.Errorf("%w: %s", ErrFailedToMarshal, err) } l.Value = string(b) @@ -103,7 +109,7 @@ func (l *Location) Marshal(msg *xatu.CannonLocation) error { b, err := protojson.Marshal(data) if err != nil { - return err + return fmt.Errorf("%w: %s", ErrFailedToMarshal, err) } l.Value = string(b) @@ -115,7 +121,7 @@ func (l *Location) Marshal(msg *xatu.CannonLocation) error { b, err := protojson.Marshal(data) if err != nil { - return err + return fmt.Errorf("%w: %s", ErrFailedToMarshal, err) } l.Value = string(b) @@ -127,7 +133,7 @@ func (l *Location) Marshal(msg *xatu.CannonLocation) error { b, err := protojson.Marshal(data) if err != nil { - return err + return fmt.Errorf("%w: %s", ErrFailedToMarshal, err) } l.Value = string(b) @@ -139,7 +145,7 @@ func (l *Location) Marshal(msg *xatu.CannonLocation) error { b, err := protojson.Marshal(data) if err != nil { - return err + return fmt.Errorf("%w: %s", ErrFailedToMarshal, err) } l.Value = string(b) @@ -150,7 +156,7 @@ func (l *Location) Marshal(msg *xatu.CannonLocation) error { b, err := protojson.Marshal(data) if err != nil { - return err + return fmt.Errorf("%w: %s", ErrFailedToMarshal, err) } l.Value = string(b) @@ -161,7 +167,7 @@ func (l *Location) Marshal(msg *xatu.CannonLocation) error { b, err := protojson.Marshal(data) if err != nil { - return err + return fmt.Errorf("%w: %s", ErrFailedToMarshal, err) } l.Value = string(b) @@ -172,7 +178,7 @@ func (l *Location) Marshal(msg *xatu.CannonLocation) error { b, err := protojson.Marshal(data) if err != nil { - return err + return fmt.Errorf("%w: %s", ErrFailedToMarshal, err) } l.Value = string(b) @@ -196,7 +202,7 @@ func (l *Location) Unmarshal() (*xatu.CannonLocation, error) { err := protojson.Unmarshal([]byte(l.Value), data) if err != nil { - return nil, err + return nil, fmt.Errorf("%w: %s", ErrFailedToUnmarshal, err) } msg.Data = &xatu.CannonLocation_EthV2BeaconBlockVoluntaryExit{ @@ -209,7 +215,7 @@ func (l *Location) Unmarshal() (*xatu.CannonLocation, error) { err := protojson.Unmarshal([]byte(l.Value), data) if err != nil { - return nil, err + return nil, fmt.Errorf("%w: %s", ErrFailedToUnmarshal, err) } msg.Data = &xatu.CannonLocation_EthV2BeaconBlockProposerSlashing{ @@ -222,7 +228,7 @@ func (l *Location) Unmarshal() (*xatu.CannonLocation, error) { err := protojson.Unmarshal([]byte(l.Value), data) if err != nil { - return nil, err + return nil, fmt.Errorf("%w: %s", ErrFailedToUnmarshal, err) } msg.Data = &xatu.CannonLocation_EthV2BeaconBlockDeposit{ @@ -235,7 +241,7 @@ func (l *Location) Unmarshal() (*xatu.CannonLocation, error) { err := protojson.Unmarshal([]byte(l.Value), data) if err != nil { - return nil, err + return nil, fmt.Errorf("%w: %s", ErrFailedToUnmarshal, err) } msg.Data = &xatu.CannonLocation_EthV2BeaconBlockAttesterSlashing{ @@ -249,7 +255,7 @@ func (l *Location) Unmarshal() (*xatu.CannonLocation, error) { err := protojson.Unmarshal([]byte(l.Value), data) if err != nil { - return nil, err + return nil, fmt.Errorf("%w: %s", ErrFailedToUnmarshal, err) } msg.Data = &xatu.CannonLocation_EthV2BeaconBlockExecutionTransaction{ @@ -263,7 +269,7 @@ func (l *Location) Unmarshal() (*xatu.CannonLocation, error) { err := protojson.Unmarshal([]byte(l.Value), data) if err != nil { - return nil, err + return nil, fmt.Errorf("%w: %s", ErrFailedToUnmarshal, err) } msg.Data = &xatu.CannonLocation_EthV2BeaconBlockBlsToExecutionChange{ @@ -277,7 +283,7 @@ func (l *Location) Unmarshal() (*xatu.CannonLocation, error) { err := protojson.Unmarshal([]byte(l.Value), data) if err != nil { - return nil, err + return nil, fmt.Errorf("%w: %s", ErrFailedToUnmarshal, err) } msg.Data = &xatu.CannonLocation_EthV2BeaconBlockWithdrawal{ @@ -290,7 +296,7 @@ func (l *Location) Unmarshal() (*xatu.CannonLocation, error) { err := protojson.Unmarshal([]byte(l.Value), data) if err != nil { - return nil, err + return nil, fmt.Errorf("%w: %s", ErrFailedToUnmarshal, err) } msg.Data = &xatu.CannonLocation_EthV2BeaconBlock{ @@ -303,7 +309,7 @@ func (l *Location) Unmarshal() (*xatu.CannonLocation, error) { err := protojson.Unmarshal([]byte(l.Value), data) if err != nil { - return nil, err + return nil, fmt.Errorf("%w: %s", ErrFailedToUnmarshal, err) } msg.Data = &xatu.CannonLocation_BlockprintBlockClassification{ @@ -316,7 +322,7 @@ func (l *Location) Unmarshal() (*xatu.CannonLocation, error) { err := protojson.Unmarshal([]byte(l.Value), data) if err != nil { - return nil, err + return nil, fmt.Errorf("%w: %s", ErrFailedToUnmarshal, err) } msg.Data = &xatu.CannonLocation_EthV1BeaconBlobSidecar{ @@ -329,7 +335,7 @@ func (l *Location) Unmarshal() (*xatu.CannonLocation, error) { err := protojson.Unmarshal([]byte(l.Value), data) if err != nil { - return nil, err + return nil, fmt.Errorf("%w: %s", ErrFailedToUnmarshal, err) } msg.Data = &xatu.CannonLocation_EthV1BeaconProposerDuty{ @@ -342,7 +348,7 @@ func (l *Location) Unmarshal() (*xatu.CannonLocation, error) { err := protojson.Unmarshal([]byte(l.Value), data) if err != nil { - return nil, err + return nil, fmt.Errorf("%w: %s", ErrFailedToUnmarshal, err) } msg.Data = &xatu.CannonLocation_EthV2BeaconBlockElaboratedAttestation{ @@ -355,7 +361,7 @@ func (l *Location) Unmarshal() (*xatu.CannonLocation, error) { err := protojson.Unmarshal([]byte(l.Value), data) if err != nil { - return nil, err + return nil, fmt.Errorf("%w: %s", ErrFailedToUnmarshal, err) } msg.Data = &xatu.CannonLocation_EthV1BeaconValidators{ diff --git a/pkg/server/persistence/cannon_location.go b/pkg/server/persistence/cannon_location.go index 306f4137..c12296ee 100644 --- a/pkg/server/persistence/cannon_location.go +++ b/pkg/server/persistence/cannon_location.go @@ -5,6 +5,8 @@ import ( "errors" "time" + perrors "github.com/pkg/errors" + "github.com/ethpandaops/xatu/pkg/server/persistence/cannon" "github.com/huandu/go-sqlbuilder" ) @@ -41,7 +43,7 @@ func (c *Client) GetCannonLocationByID(ctx context.Context, id int64) (*cannon.L rows, err := c.db.QueryContext(ctx, sql, args...) if err != nil { - return nil, err + return nil, perrors.Wrap(err, "db query failed") } defer rows.Close() @@ -53,7 +55,7 @@ func (c *Client) GetCannonLocationByID(ctx context.Context, id int64) (*cannon.L err = rows.Scan(cannonLocationStruct.Addr(&location)...) if err != nil { - return nil, err + return nil, perrors.Wrap(err, "db scan failed") } locations = append(locations, &location) @@ -76,7 +78,7 @@ func (c *Client) GetCannonLocationByNetworkIDAndType(ctx context.Context, networ rows, err := c.db.QueryContext(ctx, sql, args...) if err != nil { - return nil, err + return nil, perrors.Wrap(err, "db query failed") } defer rows.Close() @@ -88,7 +90,7 @@ func (c *Client) GetCannonLocationByNetworkIDAndType(ctx context.Context, networ err = rows.Scan(cannonLocationStruct.Addr(&location)...) if err != nil { - return nil, err + return nil, perrors.Wrap(err, "db scan failed") } locations = append(locations, &location) diff --git a/pkg/server/service/coordinator/client.go b/pkg/server/service/coordinator/client.go index f03cc94a..c5f118a6 100644 --- a/pkg/server/service/coordinator/client.go +++ b/pkg/server/service/coordinator/client.go @@ -9,6 +9,8 @@ import ( "strings" "time" + perrors "github.com/pkg/errors" + "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/ethpandaops/xatu/pkg/server/geoip" "github.com/ethpandaops/xatu/pkg/server/persistence" @@ -68,7 +70,7 @@ func (c *Client) Start(ctx context.Context, grpcServer *grpc.Server) error { xatu.RegisterCoordinatorServer(grpcServer, c) if err := c.nodeRecord.Start(ctx); err != nil { - return status.Error(codes.Internal, err.Error()) + return status.Error(codes.Internal, perrors.Wrap(err, "failed to start node record processor").Error()) } return nil @@ -93,7 +95,7 @@ func (c *Client) CreateNodeRecords(ctx context.Context, req *xatu.CreateNodeReco pRecord, err := node.Parse(record) if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) + return nil, status.Error(codes.InvalidArgument, perrors.Wrap(err, "failed to parse node record").Error()) } if c.geoipProvider != nil { @@ -142,7 +144,7 @@ func (c *Client) ListStalledExecutionNodeRecords(ctx context.Context, req *xatu. nodeRecords, err := c.persistence.CheckoutStalledExecutionNodeRecords(ctx, pageSize) if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, perrors.Wrap(err, "failed to get stalled execution node records from db").Error()) } response := &xatu.ListStalledExecutionNodeRecordsResponse{ @@ -194,7 +196,7 @@ func (c *Client) CreateExecutionNodeRecordStatus(ctx context.Context, req *xatu. err := c.persistence.InsertNodeRecordExecution(ctx, &st) if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, perrors.Wrap(err, "failed to insert node record execution").Error()) } result = "success" @@ -207,7 +209,7 @@ func (c *Client) CreateExecutionNodeRecordStatus(ctx context.Context, req *xatu. err = c.persistence.UpdateNodeRecord(ctx, nodeRecord) if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, perrors.Wrap(err, "failed to update node record").Error()) } return &xatu.CreateExecutionNodeRecordStatusResponse{}, nil @@ -254,7 +256,7 @@ func (c *Client) CoordinateExecutionNodeRecords(ctx context.Context, req *xatu.C if limit > 0 { newNodeRecords, err := c.persistence.ListAvailableExecutionNodeRecords(ctx, req.ClientId, ignoredNodeRecords, req.NetworkIds, req.ForkIdHashes, req.Capabilities, int(limit)) if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, perrors.Wrap(err, "failed to get available execution node records from db").Error()) } for _, record := range newNodeRecords { @@ -273,7 +275,7 @@ func (c *Client) CoordinateExecutionNodeRecords(ctx context.Context, req *xatu.C if len(activities) != 0 { err := c.persistence.UpsertNodeRecordActivities(ctx, activities) if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, perrors.Wrap(err, "failed to upsert node record activities to db").Error()) } } @@ -286,7 +288,7 @@ func (c *Client) CoordinateExecutionNodeRecords(ctx context.Context, req *xatu.C func (c *Client) GetDiscoveryNodeRecord(ctx context.Context, req *xatu.GetDiscoveryNodeRecordRequest) (*xatu.GetDiscoveryNodeRecordResponse, error) { records, err := c.persistence.ListNodeRecordExecutions(ctx, req.NetworkIds, req.ForkIdHashes, 100) if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, perrors.Wrap(err, "failed to get discovery node records from db").Error()) } if len(records) == 0 { @@ -304,7 +306,7 @@ func (c *Client) GetDiscoveryNodeRecord(ctx context.Context, req *xatu.GetDiscov func (c *Client) GetCannonLocation(ctx context.Context, req *xatu.GetCannonLocationRequest) (*xatu.GetCannonLocationResponse, error) { location, err := c.persistence.GetCannonLocationByNetworkIDAndType(ctx, req.NetworkId, req.Type.Enum().String()) if err != nil && err != persistence.ErrCannonLocationNotFound { - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, perrors.Wrap(err, "failed to get cannon location from db").Error()) } rsp := &xatu.GetCannonLocationResponse{} @@ -315,7 +317,7 @@ func (c *Client) GetCannonLocation(ctx context.Context, req *xatu.GetCannonLocat protoLoc, err := location.Unmarshal() if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, perrors.Wrap(err, "failed to unmarshal cannon location").Error()) } return &xatu.GetCannonLocationResponse{ @@ -328,12 +330,12 @@ func (c *Client) UpsertCannonLocation(ctx context.Context, req *xatu.UpsertCanno err := newLocation.Marshal(req.Location) if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) + return nil, status.Error(codes.InvalidArgument, perrors.Wrap(err, "failed to marshal cannon location").Error()) } err = c.persistence.UpsertCannonLocation(ctx, newLocation) if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, perrors.Wrap(err, "failed to upsert cannon location to db").Error()) } return &xatu.UpsertCannonLocationResponse{}, nil