Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Data Integrity Issue #122

Merged
merged 19 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/regression-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
uses: actions/checkout@v4
with:
repository: 0xPolygon/kurtosis-cdk
ref: "v0.2.22"
ref: v0.2.24
path: kurtosis-cdk

- name: Install Kurtosis CDK tools
Expand Down
6 changes: 0 additions & 6 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,3 @@ packages:
SequencerTracker:
config:
filename: sequencer_tracker.generated.go
github.com/0xPolygon/cdk-data-availability/services/status:
config:
interfaces:
GapsDetector:
config:
filename: gaps_detector.generated.go
21 changes: 21 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Run DAC",
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}/cmd",
"args": [
"run",
"-cfg",
"test/config/test.local.toml",
],
"cwd": ".",
},
]
}
10 changes: 5 additions & 5 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ func TestClient_GetStatus(t *testing.T) {
}{
{
name: "successfully got status",
result: `{"result":{"version":"v1.0.0","uptime":"123","key_count":2,"backfill_progress":5}}`,
result: `{"result":{"version":"v1.0.0","uptime":"123","key_count":2,"last_synchronized_block":5}}`,
status: &types.DACStatus{
Uptime: "123",
Version: "v1.0.0",
KeyCount: 2,
BackfillProgress: 5,
Uptime: "123",
Version: "v1.0.0",
KeyCount: 2,
LastSynchronizedBlock: 5,
},
},
{
Expand Down
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func start(cliCtx *cli.Context) error {
[]rpc.Service{
{
Name: status.APISTATUS,
Service: status.NewEndpoints(storage, batchSynchronizer),
Service: status.NewEndpoints(storage),
},
{
Name: sync.APISYNC,
Expand Down
140 changes: 42 additions & 98 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,52 +15,32 @@ import (
const (
// storeLastProcessedBlockSQL is a query that stores the last processed block for a given task
storeLastProcessedBlockSQL = `
INSERT INTO data_node.sync_tasks (task, block)
VALUES ($1, $2)
ON CONFLICT (task) DO UPDATE
SET block = EXCLUDED.block, processed = NOW();`
UPDATE data_node.sync_tasks
SET block = $2, processed = NOW()
WHERE task = $1;`

// getLastProcessedBlockSQL is a query that returns the last processed block for a given task
getLastProcessedBlockSQL = `SELECT block FROM data_node.sync_tasks WHERE task = $1;`

// getUnresolvedBatchKeysSQL is a query that returns the unresolved batch keys from the database
getUnresolvedBatchKeysSQL = `SELECT num, hash FROM data_node.unresolved_batches LIMIT $1;`
// getMissingBatchKeysSQL is a query that returns the missing batch keys from the database
getMissingBatchKeysSQL = `SELECT num, hash FROM data_node.missing_batches LIMIT $1;`

// getOffchainDataSQL is a query that returns the offchain data for a given key
getOffchainDataSQL = `
SELECT key, value, batch_num
SELECT key, value
FROM data_node.offchain_data
WHERE key = $1 LIMIT 1;
`

// listOffchainDataSQL is a query that returns the offchain data for a given list of keys
listOffchainDataSQL = `
SELECT key, value, batch_num
SELECT key, value
FROM data_node.offchain_data
WHERE key IN (?);
`

// countOffchainDataSQL is a query that returns the count of rows in the offchain_data table
countOffchainDataSQL = "SELECT COUNT(*) FROM data_node.offchain_data;"

// selectOffchainDataGapsSQL is a query that returns the gaps in the offchain_data table
selectOffchainDataGapsSQL = `
WITH numbered_batches AS (
SELECT
batch_num,
ROW_NUMBER() OVER (ORDER BY batch_num) AS row_number
FROM data_node.offchain_data
)
SELECT
nb1.batch_num AS current_batch_num,
nb2.batch_num AS next_batch_num
FROM
numbered_batches nb1
LEFT JOIN numbered_batches nb2 ON nb1.row_number = nb2.row_number - 1
WHERE
nb1.batch_num IS NOT NULL
AND nb2.batch_num IS NOT NULL
AND nb1.batch_num + 1 <> nb2.batch_num;`
)

var (
Expand All @@ -73,15 +53,14 @@ type DB interface {
StoreLastProcessedBlock(ctx context.Context, block uint64, task string) error
GetLastProcessedBlock(ctx context.Context, task string) (uint64, error)

StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error
GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error)
DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error
StoreMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error
GetMissingBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error)
DeleteMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error

GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error)
ListOffChainData(ctx context.Context, keys []common.Hash) ([]types.OffChainData, error)
StoreOffChainData(ctx context.Context, od []types.OffChainData) error
CountOffchainData(ctx context.Context) (uint64, error)
DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error)
}

// DB is the database layer of the data node
Expand All @@ -90,10 +69,9 @@ type pgDB struct {

storeLastProcessedBlockStmt *sqlx.Stmt
getLastProcessedBlockStmt *sqlx.Stmt
getUnresolvedBatchKeysStmt *sqlx.Stmt
getMissingBatchKeysStmt *sqlx.Stmt
getOffChainDataStmt *sqlx.Stmt
countOffChainDataStmt *sqlx.Stmt
detectOffChainDataGapsStmt *sqlx.Stmt
}

// New instantiates a DB
Expand All @@ -108,9 +86,9 @@ func New(ctx context.Context, pg *sqlx.DB) (DB, error) {
return nil, fmt.Errorf("failed to prepare the get last processed block statement: %w", err)
}

getUnresolvedBatchKeysStmt, err := pg.PreparexContext(ctx, getUnresolvedBatchKeysSQL)
getMissingBatchKeysStmt, err := pg.PreparexContext(ctx, getMissingBatchKeysSQL)
if err != nil {
return nil, fmt.Errorf("failed to prepare the get unresolved batch keys statement: %w", err)
return nil, fmt.Errorf("failed to prepare the get missing batch keys statement: %w", err)
}

getOffChainDataStmt, err := pg.PreparexContext(ctx, getOffchainDataSQL)
Expand All @@ -123,19 +101,13 @@ func New(ctx context.Context, pg *sqlx.DB) (DB, error) {
return nil, fmt.Errorf("failed to prepare the count offchain data statement: %w", err)
}

detectOffChainDataGapsStmt, err := pg.PreparexContext(ctx, selectOffchainDataGapsSQL)
if err != nil {
return nil, fmt.Errorf("failed to prepare the detect offchain data gaps statement: %w", err)
}

return &pgDB{
pg: pg,
storeLastProcessedBlockStmt: storeLastProcessedBlockStmt,
getLastProcessedBlockStmt: getLastProcessedBlockStmt,
getUnresolvedBatchKeysStmt: getUnresolvedBatchKeysStmt,
getMissingBatchKeysStmt: getMissingBatchKeysStmt,
getOffChainDataStmt: getOffChainDataStmt,
countOffChainDataStmt: countOffChainDataStmt,
detectOffChainDataGapsStmt: detectOffChainDataGapsStmt,
}, nil
}

Expand All @@ -156,24 +128,28 @@ func (db *pgDB) GetLastProcessedBlock(ctx context.Context, task string) (uint64,
return lastBlock, nil
}

// StoreUnresolvedBatchKeys stores unresolved batch keys in the database
func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error {
// StoreMissingBatchKeys stores missing batch keys in the database
func (db *pgDB) StoreMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error {
if len(bks) == 0 {
return nil
}

query, args := buildBatchKeysInsertQuery(bks)

if _, err := db.pg.ExecContext(ctx, query, args...); err != nil {
return fmt.Errorf("failed to store unresolved batches: %w", err)
batchNumbers := make([]string, len(bks))
for i, bk := range bks {
batchNumbers[i] = fmt.Sprintf("%d", bk.Number)
}
return fmt.Errorf("failed to store missing batches (batch numbers: %s): %w", strings.Join(batchNumbers, ", "), err)
}

return nil
}

// GetUnresolvedBatchKeys returns the unresolved batch keys from the database
func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) {
rows, err := db.getUnresolvedBatchKeysStmt.QueryxContext(ctx, limit)
// GetMissingBatchKeys returns the missing batch keys that is not yet present in offchain table
func (db *pgDB) GetMissingBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) {
rows, err := db.getMissingBatchKeysStmt.QueryxContext(ctx, limit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -201,8 +177,8 @@ func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types
return bks, nil
}

// DeleteUnresolvedBatchKeys deletes the unresolved batch keys from the database
func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error {
// DeleteMissingBatchKeys deletes the missing batch keys from the missing_batch table in the db
func (db *pgDB) DeleteMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error {
if len(bks) == 0 {
return nil
}
Expand All @@ -218,11 +194,11 @@ func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.Batch
}

query := fmt.Sprintf(`
DELETE FROM data_node.unresolved_batches WHERE (num, hash) IN (%s);
DELETE FROM data_node.missing_batches WHERE (num, hash) IN (%s);
`, strings.Join(values, ","))

if _, err := db.pg.ExecContext(ctx, query, args...); err != nil {
return fmt.Errorf("failed to delete unresolved batches: %w", err)
return fmt.Errorf("failed to delete missing batches: %w", err)
}

return nil
Expand All @@ -245,9 +221,8 @@ func (db *pgDB) StoreOffChainData(ctx context.Context, ods []types.OffChainData)
// GetOffChainData returns the value identified by the key
func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error) {
data := struct {
Key string `db:"key"`
Value string `db:"value"`
BatchNum uint64 `db:"batch_num"`
Key string `db:"key"`
Value string `db:"value"`
}{}

if err := db.getOffChainDataStmt.QueryRowxContext(ctx, key.Hex()).StructScan(&data); err != nil {
Expand All @@ -259,9 +234,8 @@ func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash) (*types.Of
}

return &types.OffChainData{
Key: common.HexToHash(data.Key),
Value: common.FromHex(data.Value),
BatchNum: data.BatchNum,
Key: common.HexToHash(data.Key),
Value: common.FromHex(data.Value),
}, nil
}

Expand Down Expand Up @@ -292,9 +266,8 @@ func (db *pgDB) ListOffChainData(ctx context.Context, keys []common.Hash) ([]typ
defer rows.Close()

type row struct {
Key string `db:"key"`
Value string `db:"value"`
BatchNum uint64 `db:"batch_num"`
Key string `db:"key"`
Value string `db:"value"`
}

list := make([]types.OffChainData, 0, len(keys))
Expand All @@ -305,9 +278,8 @@ func (db *pgDB) ListOffChainData(ctx context.Context, keys []common.Hash) ([]typ
}

list = append(list, types.OffChainData{
Key: common.HexToHash(data.Key),
Value: common.FromHex(data.Value),
BatchNum: data.BatchNum,
Key: common.HexToHash(data.Key),
Value: common.FromHex(data.Value),
})
}

Expand All @@ -324,34 +296,7 @@ func (db *pgDB) CountOffchainData(ctx context.Context) (uint64, error) {
return count, nil
}

// DetectOffchainDataGaps returns the number of gaps in the offchain_data table
func (db *pgDB) DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) {
rows, err := db.detectOffChainDataGapsStmt.QueryxContext(ctx)
if err != nil {
return nil, err
}

defer rows.Close()

type row struct {
CurrentBatchNum uint64 `db:"current_batch_num"`
NextBatchNum uint64 `db:"next_batch_num"`
}

gaps := make(map[uint64]uint64)
for rows.Next() {
var data row
if err = rows.StructScan(&data); err != nil {
return nil, err
}

gaps[data.CurrentBatchNum] = data.NextBatchNum
}

return gaps, nil
}

// buildBatchKeysInsertQuery builds the query to insert unresolved batch keys
// buildBatchKeysInsertQuery builds the query to insert missing batch keys
func buildBatchKeysInsertQuery(bks []types.BatchKey) (string, []interface{}) {
const columnsAffected = 2

Expand All @@ -364,32 +309,31 @@ func buildBatchKeysInsertQuery(bks []types.BatchKey) (string, []interface{}) {
}

return fmt.Sprintf(`
INSERT INTO data_node.unresolved_batches (num, hash)
INSERT INTO data_node.missing_batches (num, hash)
VALUES %s
ON CONFLICT (num, hash) DO NOTHING;
`, strings.Join(values, ",")), args
}

// buildOffchainDataInsertQuery builds the query to insert offchain data
func buildOffchainDataInsertQuery(ods []types.OffChainData) (string, []interface{}) {
const columnsAffected = 3
const columnsAffected = 2

// Remove duplicates from the given offchain data
ods = types.RemoveDuplicateOffChainData(ods)

args := make([]interface{}, len(ods)*columnsAffected)
values := make([]string, len(ods))
for i, od := range ods {
values[i] = fmt.Sprintf("($%d, $%d, $%d)", i*columnsAffected+1, i*columnsAffected+2, i*columnsAffected+3) //nolint:mnd
values[i] = fmt.Sprintf("($%d, $%d)", i*columnsAffected+1, i*columnsAffected+2) //nolint:mnd
args[i*columnsAffected] = od.Key.Hex()
args[i*columnsAffected+1] = common.Bytes2Hex(od.Value)
args[i*columnsAffected+2] = od.BatchNum
}

return fmt.Sprintf(`
INSERT INTO data_node.offchain_data (key, value, batch_num)
INSERT INTO data_node.offchain_data (key, value)
VALUES %s
ON CONFLICT (key) DO UPDATE
SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num;
SET value = EXCLUDED.value;
`, strings.Join(values, ",")), args
}
Loading
Loading