-
Notifications
You must be signed in to change notification settings - Fork 83
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
259 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package msgexec | ||
|
||
import ( | ||
"fmt" | ||
|
||
utils "github.com/forbole/juno/v5/cmd/migrate/utils" | ||
|
||
parse "github.com/forbole/juno/v5/cmd/parse/types" | ||
"github.com/forbole/juno/v5/database" | ||
msgexecdb "github.com/forbole/juno/v5/database/legacy/msgexec" | ||
"github.com/forbole/juno/v5/database/postgresql" | ||
) | ||
|
||
// RunMigration runs the migrations from v4 to v5 | ||
func RunMigration(parseConfig *parse.Config) error { | ||
cfg, err := GetConfig() | ||
if err != nil { | ||
return fmt.Errorf("error while reading config: %s", err) | ||
} | ||
|
||
// Migrate the database | ||
err = migrateDb(cfg, parseConfig) | ||
if err != nil { | ||
return fmt.Errorf("error while migrating database: %s", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func migrateDb(cfg utils.Config, parseConfig *parse.Config) error { | ||
// Build the codec | ||
encodingConfig := parseConfig.GetEncodingConfigBuilder()() | ||
|
||
// Get the db | ||
databaseCtx := database.NewContext(cfg.Database, &encodingConfig, parseConfig.GetLogger()) | ||
db, err := postgresql.Builder(databaseCtx) | ||
if err != nil { | ||
return fmt.Errorf("error while building the db: %s", err) | ||
} | ||
|
||
// Build the migrator and perform the migrations | ||
migrator := msgexecdb.NewMigrator(db.(*postgresql.Database)) | ||
return migrator.Migrate() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package msgexec | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"path" | ||
|
||
utils "github.com/forbole/juno/v5/cmd/migrate/utils" | ||
"gopkg.in/yaml.v3" | ||
|
||
"github.com/forbole/juno/v5/types/config" | ||
) | ||
|
||
// GetConfig returns the configuration reading it from the config.yaml file present inside the home directory | ||
func GetConfig() (utils.Config, error) { | ||
file := path.Join(config.HomePath, "config.yaml") | ||
|
||
// Make sure the path exists | ||
if _, err := os.Stat(file); os.IsNotExist(err) { | ||
return utils.Config{}, fmt.Errorf("config file does not exist") | ||
} | ||
|
||
bz, err := os.ReadFile(file) | ||
if err != nil { | ||
return utils.Config{}, fmt.Errorf("error while reading config files: %s", err) | ||
} | ||
|
||
var cfg utils.Config | ||
err = yaml.Unmarshal(bz, &cfg) | ||
return cfg, err | ||
} |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
package msgexec | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
|
||
sdk "github.com/cosmos/cosmos-sdk/types" | ||
"github.com/lib/pq" | ||
"github.com/rs/zerolog/log" | ||
|
||
dbtypes "github.com/forbole/juno/v5/database/migrate/utils" | ||
"github.com/forbole/juno/v5/types" | ||
) | ||
|
||
// Migrate implements database.Migrator | ||
func (db *Migrator) Migrate() error { | ||
msgTypes, err := db.getAllMsgExecStoredInDatabase() | ||
if err != nil { | ||
return fmt.Errorf("error while getting message types rows: %s", err) | ||
} | ||
|
||
var skipped = 0 | ||
// Migrate the transactions | ||
log.Info().Msg("** migrating transactions **") | ||
log.Debug().Int("tx count", len(msgTypes)).Msg("processing total transactions") | ||
|
||
for i, msgType := range msgTypes { | ||
log.Debug().Str("tx hash", msgType.TransactionHash).Msg("getting transaction....") | ||
fmt.Printf("\n processing %d/%d transaction \n", i, len(msgTypes)) | ||
|
||
tx, err := db.getMsgExecTransactionsFromDatabase(msgType.TransactionHash) | ||
if err != nil { | ||
return fmt.Errorf("error while getting transaction %s: %s", msgType.TransactionHash, err) | ||
} | ||
|
||
if tx.Success == "true" { | ||
var msgs sdk.ABCIMessageLogs | ||
err = json.Unmarshal([]byte(tx.Logs), &msgs) | ||
if err != nil { | ||
skipped++ | ||
continue | ||
} | ||
|
||
var addresses []string | ||
|
||
for _, msg := range msgs { | ||
for _, event := range msg.Events { | ||
for _, attribute := range event.Attributes { | ||
// Try parsing the address as a validator address | ||
validatorAddress, _ := sdk.ValAddressFromBech32(attribute.Value) | ||
if validatorAddress != nil { | ||
addresses = append(addresses, validatorAddress.String()) | ||
} | ||
|
||
// Try parsing the address as an account address | ||
accountAddress, err := sdk.AccAddressFromBech32(attribute.Value) | ||
if err != nil { | ||
// Skip if the address is not an account address | ||
continue | ||
} | ||
|
||
addresses = append(addresses, accountAddress.String()) | ||
} | ||
} | ||
} | ||
involvedAddresses := db.removeDuplicates(addresses) | ||
|
||
fmt.Printf("\n ADDRESSES BEFORE %s", msgType.InvolvedAccountsAddresses) | ||
fmt.Printf("\n ADDRESSES AFTER %s \n", involvedAddresses) | ||
|
||
err = db.updateMessage(types.NewMessage(msgType.TransactionHash, | ||
int(msgType.Index), | ||
msgType.Type, | ||
msgType.Value, | ||
involvedAddresses, | ||
msgType.Height), msgType.PartitionID) | ||
|
||
if err != nil { | ||
fmt.Printf("error while storing updated message: %s", err) | ||
skipped++ | ||
continue | ||
} | ||
} else { | ||
skipped++ | ||
} | ||
|
||
} | ||
|
||
log.Debug().Int("*** Total Skipped ***", skipped) | ||
|
||
return nil | ||
|
||
} | ||
|
||
// getMsgTypesFromMessageTable retrieves messages types stored in database inside message table | ||
func (db *Migrator) getAllMsgExecStoredInDatabase() ([]dbtypes.MessageRow, error) { | ||
const msgType = "cosmos.authz.v1beta1.MsgExec" | ||
var rows []dbtypes.MessageRow | ||
err := db.SQL.Select(&rows, `SELECT * FROM message WHERE type = $1 ORDER BY height ASC`, msgType) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return rows, nil | ||
} | ||
|
||
// getMsgTypesFromMessageTable retrieves messages types stored in database inside message table | ||
func (db *Migrator) getMsgExecTransactionsFromDatabase(txHash string) (dbtypes.TransactionRow, error) { | ||
var rows []dbtypes.TransactionRow | ||
err := db.SQL.Select(&rows, `SELECT * FROM transaction WHERE hash = $1`, txHash) | ||
if err != nil { | ||
return dbtypes.TransactionRow{}, err | ||
} | ||
|
||
return rows[0], nil | ||
} | ||
|
||
// updateMessage stores updated message inside the database | ||
func (db *Migrator) updateMessage(msg *types.Message, partitionID int64) error { | ||
stmt := ` | ||
INSERT INTO message(transaction_hash, index, type, value, involved_accounts_addresses, height, partition_id) | ||
VALUES ($1, $2, $3, $4, $5, $6, $7) | ||
ON CONFLICT (transaction_hash, index, partition_id) DO UPDATE | ||
SET height = excluded.height, | ||
type = excluded.type, | ||
value = excluded.value, | ||
involved_accounts_addresses = excluded.involved_accounts_addresses` | ||
|
||
_, err := db.SQL.Exec(stmt, msg.TxHash, msg.Index, msg.Type, msg.Value, pq.Array(msg.Addresses), msg.Height, partitionID) | ||
return err | ||
|
||
} | ||
|
||
// function to remove duplicate values | ||
func (db *Migrator) removeDuplicates(s []string) []string { | ||
bucket := make(map[string]bool) | ||
var result []string | ||
for _, str := range s { | ||
if _, ok := bucket[str]; !ok { | ||
bucket[str] = true | ||
result = append(result, str) | ||
} | ||
} | ||
return result | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package msgexec | ||
|
||
import ( | ||
"github.com/jmoiron/sqlx" | ||
|
||
"github.com/forbole/juno/v5/database" | ||
"github.com/forbole/juno/v5/database/postgresql" | ||
) | ||
|
||
var _ database.Migrator = &Migrator{} | ||
|
||
// Migrator represents the database migrator that should be used to migrate from v2 of the database to v3 | ||
type Migrator struct { | ||
SQL *sqlx.DB | ||
} | ||
|
||
func NewMigrator(db *postgresql.Database) *Migrator { | ||
return &Migrator{ | ||
SQL: db.SQL, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters