Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(zetaclient)!: orchestrator V2 #3332

Merged
merged 28 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
04fa091
Minor fixes
swift1337 Dec 24, 2024
0c46e64
Add orchestrator V2. Move context updater to v2
swift1337 Dec 24, 2024
63961eb
Fix orchestrator_v2 test cases
swift1337 Dec 27, 2024
df21451
Fix flaky test cases during concurrent runs (spoiler: goroutines)
swift1337 Dec 27, 2024
cac2f61
Add V2 to start.go
swift1337 Dec 27, 2024
eff0517
chain sync skeleton
swift1337 Dec 30, 2024
7232f53
Move common btc stuff to common/ to fix import cycle
swift1337 Dec 30, 2024
e13d64c
Implement BTC observerSigner
swift1337 Jan 3, 2025
91bab84
Drop redundant code
swift1337 Jan 3, 2025
91dd276
Fix ticker concurrency bug
swift1337 Jan 3, 2025
991b751
Add scheduler.Tasks()
swift1337 Jan 3, 2025
22d8a51
Add v2 btc observer-signer 101 test cases. Drop redundant tests
swift1337 Jan 3, 2025
e81db51
Address PR comments
swift1337 Jan 6, 2025
c637cee
Add issue
swift1337 Jan 7, 2025
bc5dd8f
fix inbound debug cmd
swift1337 Jan 7, 2025
87820c3
Merge branch 'develop' into feat/btc-observer-signer
swift1337 Jan 7, 2025
ee5b997
Add tss graceful shutdown
swift1337 Jan 7, 2025
5e89631
Update changelog
swift1337 Jan 7, 2025
d2d98da
fix tss tests
swift1337 Jan 7, 2025
ffcd731
Fix IntervalUpdater
swift1337 Jan 8, 2025
a04244b
Mitigate errors when BTC node is disabled
swift1337 Jan 8, 2025
c91aeea
Implement pkg/fanout
swift1337 Jan 8, 2025
3b6cb6d
Apply fanout to block subscriber
swift1337 Jan 8, 2025
ca9c7f8
Merge branch 'develop' into feat/btc-observer-signer
swift1337 Jan 8, 2025
d7e5d55
Fix typo
swift1337 Jan 9, 2025
f35ea11
Minor btc signer improvements
swift1337 Jan 10, 2025
849bdde
Make V1.Stop() safe to call multiple times
swift1337 Jan 10, 2025
c1c0078
FIX DATA RACE
swift1337 Jan 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions cmd/zetaclientd/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/zeta-chain/node/pkg/coin"
"github.com/zeta-chain/node/testutil/sample"
"github.com/zeta-chain/node/zetaclient/chains/base"
btcobserver "github.com/zeta-chain/node/zetaclient/chains/bitcoin/observer"
evmobserver "github.com/zeta-chain/node/zetaclient/chains/evm/observer"
"github.com/zeta-chain/node/zetaclient/config"
zctx "github.com/zeta-chain/node/zetaclient/context"
Expand Down Expand Up @@ -156,20 +155,21 @@ func InboundGetBallot(_ *cobra.Command, args []string) error {
}
fmt.Println("CoinType : ", coinType)
} else if chain.IsBitcoin() {
observer, ok := observers[chainID]
if !ok {
return fmt.Errorf("observer not found for btc chain %d", chainID)
}

btcObserver, ok := observer.(*btcobserver.Observer)
if !ok {
return fmt.Errorf("observer is not btc observer for chain %d", chainID)
}

ballotIdentifier, err = btcObserver.CheckReceiptForBtcTxHash(ctx, inboundHash, false)
if err != nil {
return err
}
return fmt.Errorf("not implemented")
//observer, ok := observers[chainID]
//if !ok {
// return fmt.Errorf("observer not found for btc chain %d", chainID)
//}
//
//btcObserver, ok := observer.(*btcobserver.Observer)
//if !ok {
// return fmt.Errorf("observer is not btc observer for chain %d", chainID)
//}
//
//ballotIdentifier, err = btcObserver.CheckReceiptForBtcTxHash(ctx, inboundHash, false)
//if err != nil {
// return err
//}
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
}

fmt.Println("BallotIdentifier: ", ballotIdentifier)
Expand Down
19 changes: 19 additions & 0 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/pkg/graceful"
zetaos "github.com/zeta-chain/node/pkg/os"
"github.com/zeta-chain/node/pkg/scheduler"
"github.com/zeta-chain/node/zetaclient/chains/base"
"github.com/zeta-chain/node/zetaclient/config"
zctx "github.com/zeta-chain/node/zetaclient/context"
Expand Down Expand Up @@ -152,9 +153,27 @@ func Start(_ *cobra.Command, _ []string) error {
return errors.Wrap(err, "unable to create orchestrator")
}

taskScheduler := scheduler.New(logger.Std)
maestroV2Deps := &orchestrator.Dependencies{
Zetacore: zetacoreClient,
TSS: tss,
DBPath: dbPath,
Telemetry: telemetry,
}

maestroV2, err := orchestrator.NewV2(taskScheduler, maestroV2Deps, logger)
if err != nil {
return errors.Wrap(err, "unable to create orchestrator V2")
}

// Start orchestrator with all observers and signers
graceful.AddService(ctx, maestro)

// Start orchestrator V2
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
// V2 will co-exist with V1 until all types of chains will be refactored (BTC, EVM, SOL, TON).
// (currently it's only BTC)
graceful.AddService(ctx, maestroV2)

// Block current routine until a shutdown signal is received
graceful.WaitForShutdown()

Expand Down
2 changes: 1 addition & 1 deletion e2e/e2etests/test_bitcoin_deposit_and_call_revert.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/zeta-chain/node/e2e/runner"
"github.com/zeta-chain/node/e2e/utils"
"github.com/zeta-chain/node/testutil/sample"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
)

func TestBitcoinDepositAndCallRevert(r *runner.E2ERunner, args []string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/testutil/sample"
crosschaintypes "github.com/zeta-chain/node/x/crosschain/types"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
)

// TestBitcoinDepositAndCallRevertWithDust sends a Bitcoin deposit that reverts with a dust amount in the revert outbound.
Expand Down
6 changes: 3 additions & 3 deletions e2e/e2etests/test_bitcoin_deposit_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/zeta-chain/node/e2e/utils"
testcontract "github.com/zeta-chain/node/testutil/contracts"
crosschaintypes "github.com/zeta-chain/node/x/crosschain/types"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin"
"github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
)

func TestBitcoinDepositAndCall(r *runner.E2ERunner, args []string) {
Expand All @@ -20,7 +20,7 @@ func TestBitcoinDepositAndCall(r *runner.E2ERunner, args []string) {
// Given amount to send
require.Len(r, args, 1)
amount := utils.ParseFloat(r, args[0])
amountTotal := amount + zetabitcoin.DefaultDepositorFee
amountTotal := amount + common.DefaultDepositorFee

// Given a list of UTXOs
utxos, err := r.ListDeployerUTXOs()
Expand All @@ -45,7 +45,7 @@ func TestBitcoinDepositAndCall(r *runner.E2ERunner, args []string) {
utils.RequireCCTXStatus(r, cctx, crosschaintypes.CctxStatus_OutboundMined)

// check if example contract has been called, 'bar' value should be set to amount
amountSats, err := zetabitcoin.GetSatoshis(amount)
amountSats, err := common.GetSatoshis(amount)
require.NoError(r, err)
utils.MustHaveCalledExampleContract(r, contract, big.NewInt(amountSats))
}
2 changes: 1 addition & 1 deletion e2e/e2etests/test_bitcoin_donation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/zeta-chain/node/e2e/utils"
"github.com/zeta-chain/node/pkg/constant"
crosschaintypes "github.com/zeta-chain/node/x/crosschain/types"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
)

func TestBitcoinDonation(r *runner.E2ERunner, args []string) {
Expand Down
4 changes: 2 additions & 2 deletions e2e/e2etests/test_bitcoin_std_deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/zeta-chain/node/e2e/utils"
"github.com/zeta-chain/node/pkg/memo"
crosschaintypes "github.com/zeta-chain/node/x/crosschain/types"
"github.com/zeta-chain/node/zetaclient/chains/bitcoin"
"github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
)

func TestBitcoinStdMemoDeposit(r *runner.E2ERunner, args []string) {
Expand Down Expand Up @@ -54,7 +54,7 @@ func TestBitcoinStdMemoDeposit(r *runner.E2ERunner, args []string) {

// the runner balance should be increased by the deposit amount
amountIncreased := new(big.Int).Sub(balanceAfter, balanceBefore)
amountSatoshis, err := bitcoin.GetSatoshis(amount)
amountSatoshis, err := common.GetSatoshis(amount)
require.NoError(r, err)
require.Positive(r, amountSatoshis)
// #nosec G115 always positive
Expand Down
2 changes: 1 addition & 1 deletion e2e/e2etests/test_bitcoin_std_deposit_and_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/zeta-chain/node/pkg/memo"
testcontract "github.com/zeta-chain/node/testutil/contracts"
crosschaintypes "github.com/zeta-chain/node/x/crosschain/types"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
)

func TestBitcoinStdMemoDepositAndCall(r *runner.E2ERunner, args []string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/zeta-chain/node/pkg/memo"
testcontract "github.com/zeta-chain/node/testutil/contracts"
crosschaintypes "github.com/zeta-chain/node/x/crosschain/types"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin"
"github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
)

func TestBitcoinStdMemoInscribedDepositAndCall(r *runner.E2ERunner, args []string) {
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestBitcoinStdMemoInscribedDepositAndCall(r *runner.E2ERunner, args []strin
utils.RequireCCTXStatus(r, cctx, crosschaintypes.CctxStatus_OutboundMined)

// check if example contract has been called, 'bar' value should be set to correct amount
depositFeeSats, err := zetabitcoin.GetSatoshis(zetabitcoin.DefaultDepositorFee)
depositFeeSats, err := common.GetSatoshis(common.DefaultDepositorFee)
require.NoError(r, err)
receiveAmount := depositAmount - depositFeeSats
utils.MustHaveCalledExampleContract(r, contract, big.NewInt(receiveAmount))
Expand Down
12 changes: 6 additions & 6 deletions e2e/runner/bitcoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/pkg/memo"
crosschaintypes "github.com/zeta-chain/node/x/crosschain/types"
zetabitcoin "github.com/zeta-chain/node/zetaclient/chains/bitcoin"
zetabtc "github.com/zeta-chain/node/zetaclient/chains/bitcoin/common"
btcobserver "github.com/zeta-chain/node/zetaclient/chains/bitcoin/observer"
"github.com/zeta-chain/node/zetaclient/chains/bitcoin/signer"
)
Expand Down Expand Up @@ -100,7 +100,7 @@ func (r *E2ERunner) DepositBTCWithAmount(amount float64, memo *memo.InboundMemo)
r.Logger.Info("Now sending two txs to TSS address...")

// add depositor fee so that receiver gets the exact given 'amount' in ZetaChain
amount += zetabitcoin.DefaultDepositorFee
amount += zetabtc.DefaultDepositorFee

// deposit to TSS address
var txHash *chainhash.Hash
Expand Down Expand Up @@ -148,7 +148,7 @@ func (r *E2ERunner) DepositBTC(receiver common.Address) {
r.Logger.Info("Now sending two txs to TSS address and tester ZEVM address...")

// send initial BTC to the tester ZEVM address
amount := 1.15 + zetabitcoin.DefaultDepositorFee
amount := 1.15 + zetabtc.DefaultDepositorFee
txHash, err := r.DepositBTCWithLegacyMemo(amount, utxos[:2], receiver)
require.NoError(r, err)

Expand Down Expand Up @@ -241,7 +241,7 @@ func (r *E2ERunner) sendToAddrFromDeployerWithMemo(

// use static fee 0.0005 BTC to calculate change
feeSats := btcutil.Amount(0.0005 * btcutil.SatoshiPerBitcoin)
amountInt, err := zetabitcoin.GetSatoshis(amount)
amountInt, err := zetabtc.GetSatoshis(amount)
require.NoError(r, err)
amountSats := btcutil.Amount(amountInt)
change := inputSats - feeSats - amountSats
Expand Down Expand Up @@ -351,7 +351,7 @@ func (r *E2ERunner) InscribeToTSSFromDeployerWithMemo(

// parameters to build the reveal transaction
commitOutputIdx := uint32(0)
commitAmount, err := zetabitcoin.GetSatoshis(amount)
commitAmount, err := zetabtc.GetSatoshis(amount)
require.NoError(r, err)

// build the reveal transaction to spend above funds
Expand Down Expand Up @@ -412,7 +412,7 @@ func (r *E2ERunner) QueryOutboundReceiverAndAmount(txid string) (string, int64)
// parse receiver address from pkScript
txOutput := revertTx.MsgTx().TxOut[1]
pkScript := txOutput.PkScript
receiver, err := zetabitcoin.DecodeScriptP2WPKH(hex.EncodeToString(pkScript), r.BitcoinParams)
receiver, err := zetabtc.DecodeScriptP2WPKH(hex.EncodeToString(pkScript), r.BitcoinParams)
require.NoError(r, err)

return receiver, txOutput.Value
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ require (
github.com/google/gopacket v1.1.19 // indirect
github.com/google/orderedcode v0.0.1 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/google/uuid v1.6.0
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/gorilla/handlers v1.5.1 // indirect
Expand Down Expand Up @@ -249,8 +249,8 @@ require (
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/client_model v0.4.0
github.com/prometheus/common v0.42.0
github.com/prometheus/procfs v0.9.0 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
Expand Down
9 changes: 9 additions & 0 deletions pkg/chains/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/btcsuite/btcd/chaincfg"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/tonkeeper/tongo/ton"

"github.com/zeta-chain/node/zetaclient/logs"
)

// Validate checks whether the chain is valid
Expand Down Expand Up @@ -108,6 +110,13 @@ func (chain Chain) IsTONChain() bool {
return chain.Consensus == Consensus_catchain_consensus
}

func (chain Chain) LogFields() map[string]any {
return map[string]any{
logs.FieldChain: chain.ChainId,
logs.FieldChainNetwork: chain.Network.String(),
}
}

// DecodeAddressFromChainID decode the address string to bytes
// additionalChains is a list of additional chains to search from
// in practice, it is used in the protocol to dynamically support new chains without doing an upgrade
Expand Down
7 changes: 5 additions & 2 deletions pkg/scheduler/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@ func Interval(interval time.Duration) Opt {
return func(_ *Task, opts *taskOpts) { opts.interval = interval }
}

// Skipper sets task skipper function
// Skipper sets task skipper function. If it returns true, the task is skipped.
func Skipper(skipper func() bool) Opt {
return func(t *Task, _ *taskOpts) { t.skipper = skipper }
}

// IntervalUpdater sets interval updater function.
func IntervalUpdater(intervalUpdater func() time.Duration) Opt {
return func(_ *Task, opts *taskOpts) { opts.intervalUpdater = intervalUpdater }
return func(_ *Task, opts *taskOpts) {
opts.interval = intervalUpdater()
opts.intervalUpdater = intervalUpdater
}
}

// BlockTicker makes Task to listen for new zeta blocks
Expand Down
25 changes: 25 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ func (s *Scheduler) Register(ctx context.Context, exec Executable, opts ...Opt)
return task
}

func (s *Scheduler) Tasks() map[uuid.UUID]*Task {
s.mu.RLock()
defer s.mu.RUnlock()

copied := make(map[uuid.UUID]*Task, len(s.tasks))
for k, v := range s.tasks {
copied[k] = v
}

return copied
}

// Stop stops all tasks.
func (s *Scheduler) Stop() {
s.StopGroup("")
Expand All @@ -132,6 +144,11 @@ func (s *Scheduler) StopGroup(group Group) {
return
}

s.logger.Info().
Int("tasks", len(selectedTasks)).
Str("group", string(group)).
Msg("Stopping scheduler group")

// Stop all selected tasks concurrently
var wg sync.WaitGroup
wg.Add(len(selectedTasks))
Expand Down Expand Up @@ -161,6 +178,14 @@ func (t *Task) Stop() {
t.logger.Info().Int64("time_taken_ms", timeTakenMS).Msg("Stopped scheduler task")
}

func (t *Task) Group() Group {
return t.group
}

func (t *Task) Name() string {
return t.name
}

// execute executes Task with additional logging and metrics.
func (t *Task) execute(ctx context.Context) error {
startedAt := time.Now().UTC()
Expand Down
15 changes: 12 additions & 3 deletions pkg/ticker/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func Run(ctx context.Context, interval time.Duration, task Task, opts ...Opt) er
return New(interval, task, opts...).Start(ctx)
}

// Run runs the ticker by blocking current goroutine. It also invokes BEFORE ticker starts.
// Start runs the ticker by blocking current goroutine. It also invokes BEFORE ticker starts.
// Stops when (if any):
// - context is done (returns ctx.Err())
// - task returns an error or panics
Expand Down Expand Up @@ -139,7 +139,7 @@ func (t *Ticker) Start(ctx context.Context) (err error) {
case <-ctx.Done():
// if task is finished (i.e. last tick completed BEFORE ticker.Stop(),
// then we need to return nil)
if t.stopped {
if t.isStopped() {
return nil
}
return ctx.Err()
Expand Down Expand Up @@ -214,11 +214,20 @@ func (t *Ticker) setStopState() {

t.ctxCancel()
t.stopped = true
t.ticker.Stop()
if t.ticker != nil {
t.ticker.Stop()
}

t.logger.Info().Msgf("Ticker stopped")
}

func (t *Ticker) isStopped() bool {
t.stateMu.Lock()
defer t.stateMu.Unlock()

return t.stopped
}

// DurationFromUint64Seconds converts uint64 of seconds to time.Duration.
func DurationFromUint64Seconds(seconds uint64) time.Duration {
// #nosec G115 seconds should be in range and is not user controlled
Expand Down
6 changes: 6 additions & 0 deletions zetaclient/chains/base/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,13 @@ func (ob *Observer) SetChainParams(params observertypes.ChainParams) {
ob.mu.Lock()
defer ob.mu.Unlock()

if observertypes.ChainParamsEqual(ob.chainParams, params) {
return
}

ob.chainParams = params

ob.logger.Chain.Info().Any("observer.chain_params", params).Msg("updated chain params")
}

// ZetacoreClient returns the zetacore client for the observer.
Expand Down
Loading
Loading