Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
Add SSE subscription to builder (#53)
Browse files Browse the repository at this point in the history
* Add SSE subscription to builder

* withdrawals marshalling

* add stop channel

* pr comments

* Add handling multiple beacon clients (#57)

* Add handling multiple beacon clients

* Initialize stop channel in builder.Builder

* fix withdrawals array pointer

* Build on a single head (#59)

* Build on a single head

* Forcibly stop building process for old sse events

---------

Co-authored-by: avalonche <[email protected]>

* linting

---------

Co-authored-by: Mateusz Morusiewicz <[email protected]>
  • Loading branch information
avalonche and Ruteri authored Apr 10, 2023
1 parent 470a183 commit d3b034a
Show file tree
Hide file tree
Showing 24 changed files with 302 additions and 79 deletions.
2 changes: 1 addition & 1 deletion accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func NewSimulatedBackendChain(database ethdb.Database, blockchain *core.BlockCha
filterBackend := &filterBackend{database, blockchain, backend}
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
backend.events = filters.NewEventSystem(backend.filterSystem, false)

header := backend.blockchain.CurrentBlock()
block := backend.blockchain.GetBlock(header.Hash(), header.Number.Uint64())

Expand Down
194 changes: 188 additions & 6 deletions builder/beacon_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package builder

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -10,11 +11,23 @@ import (
"sync"
"time"

"github.com/attestantio/go-eth2-client/spec/capella"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/r3labs/sse"
"golang.org/x/exp/slices"
)

type IBeaconClient interface {
isValidator(pubkey PubkeyHex) bool
getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error)
SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes)
Start() error
Stop()
}

type testBeaconClient struct {
validator *ValidatorPrivateData
slot uint64
Expand All @@ -30,8 +43,110 @@ func (b *testBeaconClient) isValidator(pubkey PubkeyHex) bool {
func (b *testBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
return PubkeyHex(hexutil.Encode(b.validator.Pk)), nil
}
func (b *testBeaconClient) Start() error {
return nil

func (b *testBeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) {
}

func (b *testBeaconClient) Start() error { return nil }

type NilBeaconClient struct{}

func (b *NilBeaconClient) isValidator(pubkey PubkeyHex) bool {
return false
}

func (b *NilBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
return PubkeyHex(""), nil
}

func (b *NilBeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) {
}

func (b *NilBeaconClient) Start() error { return nil }

func (b *NilBeaconClient) Stop() {}

type MultiBeaconClient struct {
clients []*BeaconClient
closeCh chan struct{}
}

func NewMultiBeaconClient(endpoints []string, slotsInEpoch uint64, secondsInSlot uint64) *MultiBeaconClient {
clients := []*BeaconClient{}
for _, endpoint := range endpoints {
client := NewBeaconClient(endpoint, slotsInEpoch, secondsInSlot)
clients = append(clients, client)
}

return &MultiBeaconClient{
clients: clients,
closeCh: make(chan struct{}),
}
}

func (m *MultiBeaconClient) isValidator(pubkey PubkeyHex) bool {
for _, c := range m.clients {
// Pick the first one, always true
return c.isValidator(pubkey)
}

return false
}

func (m *MultiBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
var allErrs error
for _, c := range m.clients {
pk, err := c.getProposerForNextSlot(requestedSlot)
if err != nil {
allErrs = errors.Join(allErrs, err)
continue
}

return pk, nil
}
return PubkeyHex(""), allErrs
}

func payloadAttributesMatch(l types.BuilderPayloadAttributes, r types.BuilderPayloadAttributes) bool {
if l.Timestamp != r.Timestamp ||
l.Random != r.Random ||
l.SuggestedFeeRecipient != r.SuggestedFeeRecipient ||
l.Slot != r.Slot ||
l.HeadHash != r.HeadHash ||
l.GasLimit != r.GasLimit {
return false
}

if !slices.Equal(l.Withdrawals, r.Withdrawals) {
return false
}

return true
}

func (m *MultiBeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) {
for _, c := range m.clients {
go c.SubscribeToPayloadAttributesEvents(payloadAttrC)
}
}

func (m *MultiBeaconClient) Start() error {
var allErrs error
for _, c := range m.clients {
err := c.Start()
if err != nil {
allErrs = errors.Join(allErrs, err)
}
}
return allErrs
}

func (m *MultiBeaconClient) Stop() {
for _, c := range m.clients {
c.Stop()
}

close(m.closeCh)
}

type BeaconClient struct {
Expand All @@ -42,21 +157,24 @@ type BeaconClient struct {
mu sync.Mutex
slotProposerMap map[uint64]PubkeyHex

closeCh chan struct{}
ctx context.Context
cancelFn context.CancelFunc
}

func NewBeaconClient(endpoint string, slotsInEpoch uint64, secondsInSlot uint64) *BeaconClient {
ctx, cancelFn := context.WithCancel(context.Background())
return &BeaconClient{
endpoint: endpoint,
slotsInEpoch: slotsInEpoch,
secondsInSlot: secondsInSlot,
slotProposerMap: make(map[uint64]PubkeyHex),
closeCh: make(chan struct{}),
ctx: ctx,
cancelFn: cancelFn,
}
}

func (b *BeaconClient) Stop() {
close(b.closeCh)
b.cancelFn()
}

func (b *BeaconClient) isValidator(pubkey PubkeyHex) bool {
Expand Down Expand Up @@ -109,7 +227,7 @@ func (b *BeaconClient) UpdateValidatorMapForever() {
defer timer.Stop()
for true {
select {
case <-b.closeCh:
case <-b.ctx.Done():
return
case <-timer.C:
}
Expand Down Expand Up @@ -154,6 +272,70 @@ func (b *BeaconClient) UpdateValidatorMapForever() {
}
}

// PayloadAttributesEvent represents the data of a payload_attributes event
// {"version": "capella", "data": {"proposer_index": "123", "proposal_slot": "10", "parent_block_number": "9", "parent_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "parent_block_hash": "0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "payload_attributes": {"timestamp": "123456", "prev_randao": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "suggested_fee_recipient": "0x0000000000000000000000000000000000000000", "withdrawals": [{"index": "5", "validator_index": "10", "address": "0x0000000000000000000000000000000000000000", "amount": "15640"}]}}}
type PayloadAttributesEvent struct {
Version string `json:"version"`
Data PayloadAttributesEventData `json:"data"`
}

type PayloadAttributesEventData struct {
ProposalSlot uint64 `json:"proposal_slot,string"`
ParentBlockHash common.Hash `json:"parent_block_hash"`
PayloadAttributes PayloadAttributes `json:"payload_attributes"`
}

type PayloadAttributes struct {
Timestamp uint64 `json:"timestamp,string"`
PrevRandao common.Hash `json:"prev_randao"`
SuggestedFeeRecipient common.Address `json:"suggested_fee_recipient"`
Withdrawals []*capella.Withdrawal `json:"withdrawals"`
}

// SubscribeToPayloadAttributesEvents subscribes to payload attributes events to validate fields such as prevrandao and withdrawals
func (b *BeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) {
payloadAttributesResp := new(PayloadAttributesEvent)

eventsURL := fmt.Sprintf("%s/eth/v1/events?topics=payload_attributes", b.endpoint)
log.Info("subscribing to payload_attributes events")

for {
client := sse.NewClient(eventsURL)
err := client.SubscribeRawWithContext(b.ctx, func(msg *sse.Event) {
err := json.Unmarshal(msg.Data, payloadAttributesResp)
if err != nil {
log.Error("could not unmarshal payload_attributes event", "err", err)
} else {
// convert capella.Withdrawal to types.Withdrawal
var withdrawals []*types.Withdrawal
for _, w := range payloadAttributesResp.Data.PayloadAttributes.Withdrawals {
withdrawals = append(withdrawals, &types.Withdrawal{
Index: uint64(w.Index),
Validator: uint64(w.ValidatorIndex),
Address: common.Address(w.Address),
Amount: uint64(w.Amount),
})
}

data := types.BuilderPayloadAttributes{
Slot: payloadAttributesResp.Data.ProposalSlot,
HeadHash: payloadAttributesResp.Data.ParentBlockHash,
Timestamp: hexutil.Uint64(payloadAttributesResp.Data.PayloadAttributes.Timestamp),
Random: payloadAttributesResp.Data.PayloadAttributes.PrevRandao,
SuggestedFeeRecipient: payloadAttributesResp.Data.PayloadAttributes.SuggestedFeeRecipient,
Withdrawals: withdrawals,
}
payloadAttrC <- data
}
})
if err != nil {
log.Error("failed to subscribe to payload_attributes events", "err", err)
time.Sleep(1 * time.Second)
}
log.Warn("beaconclient SubscribeRaw ended, reconnecting")
}
}

func fetchCurrentSlot(endpoint string) (uint64, error) {
headerRes := &struct {
Data []struct {
Expand Down
60 changes: 43 additions & 17 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ type ValidatorData struct {
GasLimit uint64
}

type IBeaconClient interface {
isValidator(pubkey PubkeyHex) bool
getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error)
Start() error
Stop()
}

type IRelay interface {
SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, vd ValidatorData) error
SubmitBlockCapella(msg *capellaapi.SubmitBlockRequest, vd ValidatorData) error
Expand All @@ -64,6 +57,7 @@ type Builder struct {
eth IEthereumService
dryRun bool
validator *blockvalidation.BlockValidationAPI
beaconClient IBeaconClient
builderSecretKey *bls.SecretKey
builderPublicKey boostTypes.PublicKey
builderSigningDomain boostTypes.Domain
Expand All @@ -75,9 +69,11 @@ type Builder struct {
slotAttrs []types.BuilderPayloadAttributes
slotCtx context.Context
slotCtxCancel context.CancelFunc

stop chan struct{}
}

func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService, dryRun bool, validator *blockvalidation.BlockValidationAPI) *Builder {
func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService, dryRun bool, validator *blockvalidation.BlockValidationAPI, beaconClient IBeaconClient) *Builder {
pkBytes := bls.PublicKeyFromSecretKey(sk).Compress()
pk := boostTypes.PublicKey{}
pk.FromSlice(pkBytes)
Expand All @@ -89,6 +85,7 @@ func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRe
eth: eth,
dryRun: dryRun,
validator: validator,
beaconClient: beaconClient,
builderSecretKey: sk,
builderPublicKey: pk,
builderSigningDomain: builderSigningDomain,
Expand All @@ -97,14 +94,42 @@ func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRe
slot: 0,
slotCtx: slotCtx,
slotCtxCancel: slotCtxCancel,

stop: make(chan struct{}, 1),
}
}

func (b *Builder) Start() error {
// Start regular payload attributes updates
go func() {
c := make(chan types.BuilderPayloadAttributes)
go b.beaconClient.SubscribeToPayloadAttributesEvents(c)

currentSlot := uint64(0)

for {
select {
case <-b.stop:
return
case payloadAttributes := <-c:
// Right now we are building only on a single head. This might change in the future!
if payloadAttributes.Slot < currentSlot {
continue
} else if payloadAttributes.Slot == currentSlot {
b.OnPayloadAttribute(&payloadAttributes)
} else if payloadAttributes.Slot > currentSlot {
currentSlot = payloadAttributes.Slot
b.OnPayloadAttribute(&payloadAttributes)
}
}
}
}()

return b.relay.Start()
}

func (b *Builder) Stop() error {
close(b.stop)
return nil
}

Expand Down Expand Up @@ -277,18 +302,19 @@ func (b *Builder) OnPayloadAttribute(attrs *types.BuilderPayloadAttributes) erro
b.slotMu.Lock()
defer b.slotMu.Unlock()

if b.slot != attrs.Slot {
if b.slotCtxCancel != nil {
b.slotCtxCancel()
}
// Forcibly cancel previous building job, build on top of reorgable blocks as this is the behaviour relays expect.
// This will change in the future

slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second)
b.slot = attrs.Slot
b.slotAttrs = nil
b.slotCtx = slotCtx
b.slotCtxCancel = slotCtxCancel
if b.slotCtxCancel != nil {
b.slotCtxCancel()
}

slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second)
b.slot = attrs.Slot
b.slotAttrs = nil
b.slotCtx = slotCtx
b.slotCtxCancel = slotCtxCancel

for _, currentAttrs := range b.slotAttrs {
if attrs.Equal(&currentAttrs) {
log.Debug("ignoring known payload attribute", "slot", attrs.Slot, "hash", attrs.HeadHash)
Expand Down
2 changes: 1 addition & 1 deletion builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestOnPayloadAttributes(t *testing.T) {

testEthService := &testEthereumService{synced: true, testExecutableData: testExecutableData, testBlock: testBlock, testBlockValue: big.NewInt(10)}

builder := NewBuilder(sk, flashbotsextra.NilDbService{}, &testRelay, bDomain, testEthService, false, nil)
builder := NewBuilder(sk, flashbotsextra.NilDbService{}, &testRelay, bDomain, testEthService, false, nil, &testBeacon)
builder.Start()
defer builder.Stop()

Expand Down
Loading

0 comments on commit d3b034a

Please sign in to comment.