Skip to content

Commit

Permalink
refactor to use shared pools list updater + multiple masters + backwa… (
Browse files Browse the repository at this point in the history
#683)

* refactor to use shared pools list updater + multiple masters + backward compatible

* lint

* fix init metadata
  • Loading branch information
tien7668 authored Jan 6, 2025
1 parent 693ca21 commit 8e86a36
Show file tree
Hide file tree
Showing 17 changed files with 756 additions and 300 deletions.
2 changes: 0 additions & 2 deletions pkg/liquidity-source/syncswapv2/aqua/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ var (
reserveZero = "0"
addressZero = "0x0000000000000000000000000000000000000000"

poolMasterMethodPoolsLength = "poolsLength"
poolMasterMethodPools = "pools"
poolMethodPoolType = "poolType"
poolMethodGetAssets = "getAssets"
poolMethodGetReserves = "getReserves"
Expand Down
14 changes: 10 additions & 4 deletions pkg/liquidity-source/syncswapv2/aqua/pool_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/holiman/uint256"

"github.com/KyberNetwork/kyberswap-dex-lib/pkg/entity"
"github.com/KyberNetwork/kyberswap-dex-lib/pkg/liquidity-source/syncswapv2"
"github.com/KyberNetwork/kyberswap-dex-lib/pkg/source/pool"
"github.com/KyberNetwork/kyberswap-dex-lib/pkg/source/syncswap"
)

// const (
Expand Down Expand Up @@ -50,12 +50,12 @@ type PoolParams struct {
}

type PoolTracker struct {
config *syncswap.Config
config *syncswapv2.Config
ethrpcClient *ethrpc.Client
}

func NewPoolTracker(
config *syncswap.Config,
config *syncswapv2.Config,
ethrpcClient *ethrpc.Client,
) *PoolTracker {
return &PoolTracker{
Expand Down Expand Up @@ -91,10 +91,15 @@ func (d *PoolTracker) GetNewPoolState(
}
feeManagerV2Address = common.HexToAddress(extra.FeeManagerAddress)

masterAddress := d.config.MasterAddress[0]
if extra.MasterAddress != "" {
masterAddress = extra.MasterAddress
}

calls := d.ethrpcClient.NewRequest().SetContext(ctx)
calls.AddCall(&ethrpc.Call{
ABI: masterABI,
Target: d.config.MasterAddress,
Target: masterAddress,
Method: poolMethodGetFeeManager,
Params: nil,
}, []interface{}{&feeManagerV2Address})
Expand Down Expand Up @@ -272,6 +277,7 @@ func (d *PoolTracker) GetNewPoolState(
FutureGamma: int64(poolParams.FutureGamma),
InitialTime: int64(poolParams.InitialTime),
FeeManagerAddress: feeManagerV2Address.Hex(),
MasterAddress: masterAddress,
})

if err != nil {
Expand Down
108 changes: 15 additions & 93 deletions pkg/liquidity-source/syncswapv2/aqua/pools_list_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package syncswapv2aqua

import (
"context"
"math/big"
"strings"
"time"

Expand All @@ -12,117 +11,39 @@ import (
"github.com/goccy/go-json"

"github.com/KyberNetwork/kyberswap-dex-lib/pkg/entity"
"github.com/KyberNetwork/kyberswap-dex-lib/pkg/source/syncswap"
"github.com/KyberNetwork/kyberswap-dex-lib/pkg/util"
"github.com/KyberNetwork/kyberswap-dex-lib/pkg/liquidity-source/syncswapv2"

syncswapv2shared "github.com/KyberNetwork/kyberswap-dex-lib/pkg/liquidity-source/syncswapv2/shared"
)

type PoolsListUpdater struct {
config *syncswap.Config
ethrpcClient *ethrpc.Client
syncswapv2shared.PoolsListUpdater
}

func NewPoolsListUpdater(
config *syncswap.Config,
config *syncswapv2.Config,
ethrpcClient *ethrpc.Client,
) *PoolsListUpdater {
return &PoolsListUpdater{
config: config,
ethrpcClient: ethrpcClient,
PoolsListUpdater: syncswapv2shared.PoolsListUpdater{
Config: config,
EthrpcClient: ethrpcClient,
},
}
}

func (d *PoolsListUpdater) GetNewPools(ctx context.Context, metadataBytes []byte) ([]entity.Pool, []byte, error) {
var metadata syncswap.Metadata
if len(metadataBytes) != 0 {
if err := json.Unmarshal(metadataBytes, &metadata); err != nil {
return nil, metadataBytes, err
}
}

ctx = util.NewContextWithTimestamp(ctx)

var lengthBI *big.Int
if _, err := d.ethrpcClient.NewRequest().AddCall(&ethrpc.Call{
ABI: masterABI,
Target: d.config.MasterAddress,
Method: poolMasterMethodPoolsLength,
Params: nil,
}, []interface{}{&lengthBI}).Call(); err != nil {
logger.WithFields(logger.Fields{
"error": err,
}).Errorf("failed to get number of pools from master address")

return nil, metadataBytes, err
}

totalNumberOfPools := int(lengthBI.Int64())
batchSize := d.config.NewPoolLimit
currentOffset := metadata.Offset
if currentOffset+batchSize > totalNumberOfPools {
batchSize = totalNumberOfPools - currentOffset
if batchSize <= 0 {
return nil, metadataBytes, nil
}
}

getPoolAddressRequest := d.ethrpcClient.NewRequest()
var poolAddresses = make([]common.Address, batchSize)
for i := 0; i < batchSize; i++ {
getPoolAddressRequest.AddCall(&ethrpc.Call{
ABI: masterABI,
Target: d.config.MasterAddress,
Method: poolMasterMethodPools,
Params: []interface{}{big.NewInt(int64(currentOffset + i))},
}, []interface{}{&poolAddresses[i]})
}
if _, err := getPoolAddressRequest.Aggregate(); err != nil {
logger.WithFields(logger.Fields{
"error": err,
}).Errorf("failed to get pool addresses")

return nil, metadataBytes, err
}

pools, err := d.processBatch(ctx, poolAddresses)
if err != nil {
logger.WithFields(logger.Fields{
"error": err,
}).Errorf("failed to process get pool states")

return nil, metadataBytes, err
}

if len(pools) > 0 {
logger.WithFields(logger.Fields{
"dexID": d.config.DexID,
"batchSize": batchSize,
"totalNumberOfUpdatedPools": currentOffset + batchSize,
"totalNumberOfPools": totalNumberOfPools,
}).Info("scan SyncSwapPoolMaster")
}

nextOffset := currentOffset + batchSize
if nextOffset > totalNumberOfPools {
nextOffset = totalNumberOfPools
}
newMetadataBytes, err := json.Marshal(syncswap.Metadata{
Offset: nextOffset,
})
if err != nil {
return nil, metadataBytes, err
}

return pools, newMetadataBytes, nil
return d.GetPools(ctx, metadataBytes, d.processBatch)
}

func (d *PoolsListUpdater) processBatch(ctx context.Context, poolAddresses []common.Address) ([]entity.Pool, error) {
func (d *PoolsListUpdater) processBatch(ctx context.Context, poolAddresses []common.Address, masterAddresses []string) ([]entity.Pool, error) {
var (
poolTypes = make([]uint16, len(poolAddresses))
assets = make([][2]common.Address, len(poolAddresses))
feeManagers = make([]common.Address, len(poolAddresses))
)

calls := d.ethrpcClient.NewRequest().SetContext(ctx)
calls := d.PoolsListUpdater.EthrpcClient.NewRequest().SetContext(ctx)
for i := 0; i < len(poolAddresses); i++ {
calls.AddCall(&ethrpc.Call{
ABI: aquaPoolABI,
Expand All @@ -139,7 +60,7 @@ func (d *PoolsListUpdater) processBatch(ctx context.Context, poolAddresses []com
}, []interface{}{&assets[i]})
calls.AddCall(&ethrpc.Call{
ABI: masterABI,
Target: d.config.MasterAddress,
Target: masterAddresses[i],
Method: poolMethodGetFeeManager,
Params: nil,
}, []interface{}{&feeManagers[i]})
Expand All @@ -163,6 +84,7 @@ func (d *PoolsListUpdater) processBatch(ctx context.Context, poolAddresses []com
}
temp, err := json.Marshal(ExtraAquaPool{
FeeManagerAddress: feeManagers[i].Hex(),
MasterAddress: masterAddresses[i],
})
if err != nil {
return nil, err
Expand All @@ -182,7 +104,7 @@ func (d *PoolsListUpdater) processBatch(ctx context.Context, poolAddresses []com

newPool := entity.Pool{
Address: poolAddress,
Exchange: d.config.DexID,
Exchange: d.PoolsListUpdater.Config.DexID,
Type: PoolTypeSyncSwapV2Aqua,
Timestamp: time.Now().Unix(),
Reserves: entity.PoolReserves{reserveZero, reserveZero},
Expand Down
1 change: 1 addition & 0 deletions pkg/liquidity-source/syncswapv2/aqua/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ type ExtraAquaPool struct {
InitialGamma int64 `json:"initialGamma"`
FutureGamma int64 `json:"futureGamma"`
FeeManagerAddress string `json:"feeManagerAddress"`
MasterAddress string `json:"masterAddress"`
}
12 changes: 5 additions & 7 deletions pkg/liquidity-source/syncswapv2/classic/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ var (
reserveZero = "0"
addressZero = "0x0000000000000000000000000000000000000000"

poolMasterMethodPoolsLength = "poolsLength"
poolMasterMethodPools = "pools"
poolMethodPoolType = "poolType"
poolMethodGetAssets = "getAssets"
poolMethodGetSwapFee = "getSwapFee"
poolMethodGetReserves = "getReserves"
poolMethodVault = "vault"
poolMethodPoolType = "poolType"
poolMethodGetAssets = "getAssets"
poolMethodGetSwapFee = "getSwapFee"
poolMethodGetReserves = "getReserves"
poolMethodVault = "vault"
)

var (
Expand Down
6 changes: 3 additions & 3 deletions pkg/liquidity-source/syncswapv2/classic/pool_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import (
"github.com/holiman/uint256"

"github.com/KyberNetwork/kyberswap-dex-lib/pkg/entity"
"github.com/KyberNetwork/kyberswap-dex-lib/pkg/liquidity-source/syncswapv2"
"github.com/KyberNetwork/kyberswap-dex-lib/pkg/source/pool"
"github.com/KyberNetwork/kyberswap-dex-lib/pkg/source/syncswap"
)

type PoolTracker struct {
config *syncswap.Config
config *syncswapv2.Config
ethrpcClient *ethrpc.Client
}

func NewPoolTracker(
config *syncswap.Config,
config *syncswapv2.Config,
ethrpcClient *ethrpc.Client,
) *PoolTracker {
return &PoolTracker{
Expand Down
Loading

0 comments on commit 8e86a36

Please sign in to comment.