diff --git a/go.mod b/go.mod index 531a7ecd3f..ce6761d49e 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,7 @@ require ( github.com/docker/docker v27.0.3+incompatible github.com/docker/go-connections v0.5.0 github.com/fsouza/fake-gcs-server v1.49.2 + github.com/otiai10/copy v1.14.0 ) require ( diff --git a/go.sum b/go.sum index 13d3a0acf0..4b3d6b186e 100644 --- a/go.sum +++ b/go.sum @@ -401,6 +401,10 @@ github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQ github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/otiai10/copy v1.14.0 h1:dCI/t1iTdYGtkvCuBG2BgR6KZa83PTclw4U5n2wAllU= +github.com/otiai10/copy v1.14.0/go.mod h1:ECfuL02W+/FkTWZWgQqXPWZgW9oeKCSQ5qVfSc4qc4w= +github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks= +github.com/otiai10/mint v1.5.1/go.mod h1:MJm72SBthJjz8qhefc4z1PYEieWmy8Bku7CjcAqyUSM= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 8f3e2ae9eb..a07bc0ee10 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -20,6 +20,7 @@ import ( "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/db2/schema" "github.com/stellar/go/services/horizon/internal/ingest" + "github.com/stellar/go/support/config" support "github.com/stellar/go/support/config" "github.com/stellar/go/support/datastore" "github.com/stellar/go/support/db" @@ -27,24 +28,36 @@ import ( hlog "github.com/stellar/go/support/log" ) -var runDBReingestRangeFn = runDBReingestRange - -var dbCmd = &cobra.Command{ - Use: "db [command]", - Short: "commands to manage horizon's postgres db", -} - -var dbMigrateCmd = &cobra.Command{ - Use: "migrate [command]", - Short: "commands to run schema migrations on horizon's postgres db", -} +var ( + runDBReingestRangeFn = runDBReingestRange + dbCmd *cobra.Command + dbMigrateCmd *cobra.Command + dbInitCmd *cobra.Command + dbMigrateDownCmd *cobra.Command + dbMigrateRedoCmd *cobra.Command + dbMigrateStatusCmd *cobra.Command + dbMigrateUpCmd *cobra.Command + dbReapCmd *cobra.Command + dbReingestCmd *cobra.Command + dbReingestRangeCmd *cobra.Command + dbFillGapsCmd *cobra.Command + dbDetectGapsCmd *cobra.Command + reingestForce bool + parallelWorkers uint + parallelJobSize uint32 + retries uint + retryBackoffSeconds uint + ledgerBackendStr string + storageBackendConfigPath string + ledgerBackendType ingest.LedgerBackendType +) -func requireAndSetFlags(names ...string) error { +func requireAndSetFlags(horizonFlags config.ConfigOptions, names ...string) error { set := map[string]bool{} for _, name := range names { set[name] = true } - for _, flag := range globalFlags { + for _, flag := range horizonFlags { if set[flag.Name] { flag.Require() if err := flag.SetValue(); err != nil { @@ -63,44 +76,17 @@ func requireAndSetFlags(names ...string) error { return fmt.Errorf("could not find %s flags", strings.Join(missing, ",")) } -var dbInitCmd = &cobra.Command{ - Use: "init", - Short: "install schema", - Long: "init initializes the postgres database used by horizon.", - RunE: func(cmd *cobra.Command, args []string) error { - if err := requireAndSetFlags(horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { - return err - } - - db, err := sql.Open("postgres", globalConfig.DatabaseURL) - if err != nil { - return err - } - - numMigrationsRun, err := schema.Migrate(db, schema.MigrateUp, 0) - if err != nil { - return err - } - - if numMigrationsRun == 0 { - log.Println("No migrations applied.") - } else { - log.Printf("Successfully applied %d migrations.\n", numMigrationsRun) - } - return nil - }, -} - -func migrate(dir schema.MigrateDir, count int) error { - if !globalConfig.Ingest { +func migrate(dir schema.MigrateDir, count int, horizonConfig *horizon.Config) error { + if !horizonConfig.Ingest { log.Println("Skipping migrations because ingest flag is not enabled") return nil } - dbConn, err := db.Open("postgres", globalConfig.DatabaseURL) + dbConn, err := db.Open("postgres", horizonConfig.DatabaseURL) if err != nil { return err } + defer dbConn.Close() numMigrationsRun, err := schema.Migrate(dbConn.DB.DB, dir, count) if err != nil { @@ -115,163 +101,6 @@ func migrate(dir schema.MigrateDir, count int) error { return nil } -var dbMigrateDownCmd = &cobra.Command{ - Use: "down COUNT", - Short: "run downwards db schema migrations", - Long: "performs a downards schema migration command", - RunE: func(cmd *cobra.Command, args []string) error { - if err := requireAndSetFlags(horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { - return err - } - - // Only allow invocations with 1 args. - if len(args) != 1 { - return ErrUsage{cmd} - } - - count, err := strconv.Atoi(args[0]) - if err != nil { - log.Println(err) - return ErrUsage{cmd} - } - - return migrate(schema.MigrateDown, count) - }, -} - -var dbMigrateRedoCmd = &cobra.Command{ - Use: "redo COUNT", - Short: "redo db schema migrations", - Long: "performs a redo schema migration command", - RunE: func(cmd *cobra.Command, args []string) error { - if err := requireAndSetFlags(horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { - return err - } - - // Only allow invocations with 1 args. - if len(args) != 1 { - return ErrUsage{cmd} - } - - count, err := strconv.Atoi(args[0]) - if err != nil { - log.Println(err) - return ErrUsage{cmd} - } - - return migrate(schema.MigrateRedo, count) - }, -} - -var dbMigrateStatusCmd = &cobra.Command{ - Use: "status", - Short: "print current database migration status", - Long: "print current database migration status", - RunE: func(cmd *cobra.Command, args []string) error { - if err := requireAndSetFlags(horizon.DatabaseURLFlagName); err != nil { - return err - } - - // Only allow invocations with 0 args. - if len(args) != 0 { - fmt.Println(args) - return ErrUsage{cmd} - } - - dbConn, err := db.Open("postgres", globalConfig.DatabaseURL) - if err != nil { - return err - } - - status, err := schema.Status(dbConn.DB.DB) - if err != nil { - return err - } - - fmt.Println(status) - return nil - }, -} - -var dbMigrateUpCmd = &cobra.Command{ - Use: "up [COUNT]", - Short: "run upwards db schema migrations", - Long: "performs an upwards schema migration command", - RunE: func(cmd *cobra.Command, args []string) error { - if err := requireAndSetFlags(horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { - return err - } - - // Only allow invocations with 0-1 args. - if len(args) > 1 { - return ErrUsage{cmd} - } - - count := 0 - if len(args) == 1 { - var err error - count, err = strconv.Atoi(args[0]) - if err != nil { - log.Println(err) - return ErrUsage{cmd} - } - } - - return migrate(schema.MigrateUp, count) - }, -} - -var dbReapCmd = &cobra.Command{ - Use: "reap", - Short: "reaps (i.e. removes) any reapable history data", - Long: "reap removes any historical data that is earlier than the configured retention cutoff", - RunE: func(cmd *cobra.Command, args []string) error { - - err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}) - if err != nil { - return err - } - - session, err := db.Open("postgres", globalConfig.DatabaseURL) - if err != nil { - return fmt.Errorf("cannot open Horizon DB: %v", err) - } - defer session.Close() - - reaper := ingest.NewReaper( - ingest.ReapConfig{ - RetentionCount: uint32(globalConfig.HistoryRetentionCount), - BatchSize: uint32(globalConfig.HistoryRetentionReapCount), - }, - session, - ) - ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) - defer cancel() - return reaper.DeleteUnretainedHistory(ctx) - }, -} - -var dbReingestCmd = &cobra.Command{ - Use: "reingest", - Short: "reingest commands", - Long: "reingest ingests historical data for every ledger or ledgers specified by subcommand", - RunE: func(cmd *cobra.Command, args []string) error { - fmt.Println("Use one of the subcomands...") - return ErrUsage{cmd} - }, -} - -var ( - reingestForce bool - parallelWorkers uint - parallelJobSize uint32 - retries uint - retryBackoffSeconds uint - ledgerBackendStr string - storageBackendConfigPath string - ledgerBackendType ingest.LedgerBackendType -) - func ingestRangeCmdOpts() support.ConfigOptions { return support.ConfigOptions{ { @@ -355,138 +184,7 @@ func ingestRangeCmdOpts() support.ConfigOptions { } var dbReingestRangeCmdOpts = ingestRangeCmdOpts() -var dbReingestRangeCmd = &cobra.Command{ - Use: "range [Start sequence number] [End sequence number]", - Short: "reingests ledgers within a range", - Long: "reingests ledgers between X and Y sequence number (closed intervals)", - RunE: func(cmd *cobra.Command, args []string) error { - if err := dbReingestRangeCmdOpts.RequireE(); err != nil { - return err - } - if err := dbReingestRangeCmdOpts.SetValues(); err != nil { - return err - } - - if len(args) != 2 { - return ErrUsage{cmd} - } - - argsUInt32 := make([]uint32, 2) - for i, arg := range args { - if seq, err := strconv.ParseUint(arg, 10, 32); err != nil { - cmd.Usage() - return fmt.Errorf(`invalid sequence number "%s"`, arg) - } else { - argsUInt32[i] = uint32(seq) - } - } - - var storageBackendConfig ingest.StorageBackendConfig - options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false} - if ledgerBackendType == ingest.BufferedStorageBackend { - cfg, err := toml.LoadFile(storageBackendConfigPath) - if err != nil { - return fmt.Errorf("failed to load config file %v: %w", storageBackendConfigPath, err) - } - if err = cfg.Unmarshal(&storageBackendConfig); err != nil { - return fmt.Errorf("error unmarshalling TOML config: %w", err) - } - storageBackendConfig.BufferedStorageBackendFactory = ledgerbackend.NewBufferedStorageBackend - storageBackendConfig.DataStoreFactory = datastore.NewDataStore - // when using buffered storage, performance observations have noted optimal parallel batch size - // of 100, apply that as default if the flag was absent. - if !viper.IsSet("parallel-job-size") { - parallelJobSize = 100 - } - options.NoCaptiveCore = true - } - - err := horizon.ApplyFlags(globalConfig, globalFlags, options) - if err != nil { - return err - } - return runDBReingestRangeFn( - []history.LedgerRange{{StartSequence: argsUInt32[0], EndSequence: argsUInt32[1]}}, - reingestForce, - parallelWorkers, - *globalConfig, - storageBackendConfig, - ) - }, -} - var dbFillGapsCmdOpts = ingestRangeCmdOpts() -var dbFillGapsCmd = &cobra.Command{ - Use: "fill-gaps [Start sequence number] [End sequence number]", - Short: "Ingests any gaps found in the horizon db", - Long: "Ingests any gaps found in the horizon db. The command takes an optional start and end parameters which restrict the range of ledgers ingested.", - RunE: func(cmd *cobra.Command, args []string) error { - if err := dbFillGapsCmdOpts.RequireE(); err != nil { - return err - } - if err := dbFillGapsCmdOpts.SetValues(); err != nil { - return err - } - - if len(args) != 0 && len(args) != 2 { - hlog.Errorf("Expected either 0 arguments or 2 but found %v arguments", len(args)) - return ErrUsage{cmd} - } - - var start, end uint64 - var withRange bool - if len(args) == 2 { - var err error - start, err = strconv.ParseUint(args[0], 10, 32) - if err != nil { - cmd.Usage() - return fmt.Errorf(`invalid sequence number "%s"`, args[0]) - } - end, err = strconv.ParseUint(args[1], 10, 32) - if err != nil { - cmd.Usage() - return fmt.Errorf(`invalid sequence number "%s"`, args[1]) - } - withRange = true - } - - var storageBackendConfig ingest.StorageBackendConfig - options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false} - if ledgerBackendType == ingest.BufferedStorageBackend { - cfg, err := toml.LoadFile(storageBackendConfigPath) - if err != nil { - return fmt.Errorf("failed to load config file %v: %w", storageBackendConfigPath, err) - } - if err = cfg.Unmarshal(&storageBackendConfig); err != nil { - return fmt.Errorf("error unmarshalling TOML config: %w", err) - } - storageBackendConfig.BufferedStorageBackendFactory = ledgerbackend.NewBufferedStorageBackend - storageBackendConfig.DataStoreFactory = datastore.NewDataStore - options.NoCaptiveCore = true - } - - err := horizon.ApplyFlags(globalConfig, globalFlags, options) - if err != nil { - return err - } - var gaps []history.LedgerRange - if withRange { - gaps, err = runDBDetectGapsInRange(*globalConfig, uint32(start), uint32(end)) - if err != nil { - return err - } - hlog.Infof("found gaps %v within range [%v, %v]", gaps, start, end) - } else { - gaps, err = runDBDetectGaps(*globalConfig) - if err != nil { - return err - } - hlog.Infof("found gaps %v", gaps) - } - - return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, *globalConfig, storageBackendConfig) - }, -} func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error { var err error @@ -558,35 +256,6 @@ the reingest command completes.`) return nil } -var dbDetectGapsCmd = &cobra.Command{ - Use: "detect-gaps", - Short: "detects ingestion gaps in Horizon's database", - Long: "detects ingestion gaps in Horizon's database and prints a list of reingest commands needed to fill the gaps", - RunE: func(cmd *cobra.Command, args []string) error { - if err := requireAndSetFlags(horizon.DatabaseURLFlagName); err != nil { - return err - } - - if len(args) != 0 { - return ErrUsage{cmd} - } - gaps, err := runDBDetectGaps(*globalConfig) - if err != nil { - return err - } - if len(gaps) == 0 { - hlog.Info("No gaps found") - return nil - } - fmt.Println("Horizon commands to run in order to fill in the gaps:") - cmdname := os.Args[0] - for _, g := range gaps { - fmt.Printf("%s db reingest range %d %d\n", cmdname, g.StartSequence, g.EndSequence) - } - return nil - }, -} - func runDBDetectGaps(config horizon.Config) ([]history.LedgerRange, error) { horizonSession, err := db.Open("postgres", config.DatabaseURL) if err != nil { @@ -607,7 +276,352 @@ func runDBDetectGapsInRange(config horizon.Config, start, end uint32) ([]history return q.GetLedgerGapsInRange(context.Background(), start, end) } -func init() { +func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, horizonFlags config.ConfigOptions) { + dbCmd = &cobra.Command{ + Use: "db [command]", + Short: "commands to manage horizon's postgres db", + } + + dbMigrateCmd = &cobra.Command{ + Use: "migrate [command]", + Short: "commands to run schema migrations on horizon's postgres db", + } + + dbInitCmd = &cobra.Command{ + Use: "init", + Short: "install schema", + Long: "init initializes the postgres database used by horizon.", + RunE: func(cmd *cobra.Command, args []string) error { + if err := requireAndSetFlags(horizonFlags, horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { + return err + } + + db, err := sql.Open("postgres", horizonConfig.DatabaseURL) + if err != nil { + return err + } + + numMigrationsRun, err := schema.Migrate(db, schema.MigrateUp, 0) + if err != nil { + return err + } + + if numMigrationsRun == 0 { + log.Println("No migrations applied.") + } else { + log.Printf("Successfully applied %d migrations.\n", numMigrationsRun) + } + return nil + }, + } + + dbMigrateDownCmd = &cobra.Command{ + Use: "down COUNT", + Short: "run downwards db schema migrations", + Long: "performs a downards schema migration command", + RunE: func(cmd *cobra.Command, args []string) error { + if err := requireAndSetFlags(horizonFlags, horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { + return err + } + + // Only allow invocations with 1 args. + if len(args) != 1 { + return ErrUsage{cmd} + } + + count, err := strconv.Atoi(args[0]) + if err != nil { + log.Println(err) + return ErrUsage{cmd} + } + + return migrate(schema.MigrateDown, count, horizonConfig) + }, + } + + dbMigrateRedoCmd = &cobra.Command{ + Use: "redo COUNT", + Short: "redo db schema migrations", + Long: "performs a redo schema migration command", + RunE: func(cmd *cobra.Command, args []string) error { + if err := requireAndSetFlags(horizonFlags, horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { + return err + } + + // Only allow invocations with 1 args. + if len(args) != 1 { + return ErrUsage{cmd} + } + + count, err := strconv.Atoi(args[0]) + if err != nil { + log.Println(err) + return ErrUsage{cmd} + } + + return migrate(schema.MigrateRedo, count, horizonConfig) + }, + } + + dbMigrateStatusCmd = &cobra.Command{ + Use: "status", + Short: "print current database migration status", + Long: "print current database migration status", + RunE: func(cmd *cobra.Command, args []string) error { + if err := requireAndSetFlags(horizonFlags, horizon.DatabaseURLFlagName); err != nil { + return err + } + + // Only allow invocations with 0 args. + if len(args) != 0 { + fmt.Println(args) + return ErrUsage{cmd} + } + + dbConn, err := db.Open("postgres", horizonConfig.DatabaseURL) + if err != nil { + return err + } + + status, err := schema.Status(dbConn.DB.DB) + if err != nil { + return err + } + + fmt.Println(status) + return nil + }, + } + + dbMigrateUpCmd = &cobra.Command{ + Use: "up [COUNT]", + Short: "run upwards db schema migrations", + Long: "performs an upwards schema migration command", + RunE: func(cmd *cobra.Command, args []string) error { + if err := requireAndSetFlags(horizonFlags, horizon.DatabaseURLFlagName, horizon.IngestFlagName); err != nil { + return err + } + + // Only allow invocations with 0-1 args. + if len(args) > 1 { + return ErrUsage{cmd} + } + + count := 0 + if len(args) == 1 { + var err error + count, err = strconv.Atoi(args[0]) + if err != nil { + log.Println(err) + return ErrUsage{cmd} + } + } + + return migrate(schema.MigrateUp, count, horizonConfig) + }, + } + + dbReapCmd = &cobra.Command{ + Use: "reap", + Short: "reaps (i.e. removes) any reapable history data", + Long: "reap removes any historical data that is earlier than the configured retention cutoff", + RunE: func(cmd *cobra.Command, args []string) error { + + err := horizon.ApplyFlags(horizonConfig, horizonFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}) + if err != nil { + return err + } + + session, err := db.Open("postgres", horizonConfig.DatabaseURL) + if err != nil { + return fmt.Errorf("cannot open Horizon DB: %v", err) + } + defer session.Close() + + reaper := ingest.NewReaper( + ingest.ReapConfig{ + RetentionCount: uint32(horizonConfig.HistoryRetentionCount), + BatchSize: uint32(horizonConfig.HistoryRetentionReapCount), + }, + session, + ) + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + defer cancel() + return reaper.DeleteUnretainedHistory(ctx) + }, + } + + dbReingestCmd = &cobra.Command{ + Use: "reingest", + Short: "reingest commands", + Long: "reingest ingests historical data for every ledger or ledgers specified by subcommand", + RunE: func(cmd *cobra.Command, args []string) error { + fmt.Println("Use one of the subcomands...") + return ErrUsage{cmd} + }, + } + + dbReingestRangeCmd = &cobra.Command{ + Use: "range [Start sequence number] [End sequence number]", + Short: "reingests ledgers within a range", + Long: "reingests ledgers between X and Y sequence number (closed intervals)", + RunE: func(cmd *cobra.Command, args []string) error { + if err := dbReingestRangeCmdOpts.RequireE(); err != nil { + return err + } + if err := dbReingestRangeCmdOpts.SetValues(); err != nil { + return err + } + + if len(args) != 2 { + return ErrUsage{cmd} + } + + argsUInt32 := make([]uint32, 2) + for i, arg := range args { + if seq, err := strconv.ParseUint(arg, 10, 32); err != nil { + cmd.Usage() + return fmt.Errorf(`invalid sequence number "%s"`, arg) + } else { + argsUInt32[i] = uint32(seq) + } + } + + var storageBackendConfig ingest.StorageBackendConfig + options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false} + if ledgerBackendType == ingest.BufferedStorageBackend { + cfg, err := toml.LoadFile(storageBackendConfigPath) + if err != nil { + return fmt.Errorf("failed to load config file %v: %w", storageBackendConfigPath, err) + } + if err = cfg.Unmarshal(&storageBackendConfig); err != nil { + return fmt.Errorf("error unmarshalling TOML config: %w", err) + } + storageBackendConfig.BufferedStorageBackendFactory = ledgerbackend.NewBufferedStorageBackend + storageBackendConfig.DataStoreFactory = datastore.NewDataStore + // when using buffered storage, performance observations have noted optimal parallel batch size + // of 100, apply that as default if the flag was absent. + if !viper.IsSet("parallel-job-size") { + parallelJobSize = 100 + } + options.NoCaptiveCore = true + } + + err := horizon.ApplyFlags(horizonConfig, horizonFlags, options) + if err != nil { + return err + } + return runDBReingestRangeFn( + []history.LedgerRange{{StartSequence: argsUInt32[0], EndSequence: argsUInt32[1]}}, + reingestForce, + parallelWorkers, + *horizonConfig, + storageBackendConfig, + ) + }, + } + + dbFillGapsCmd = &cobra.Command{ + Use: "fill-gaps [Start sequence number] [End sequence number]", + Short: "Ingests any gaps found in the horizon db", + Long: "Ingests any gaps found in the horizon db. The command takes an optional start and end parameters which restrict the range of ledgers ingested.", + RunE: func(cmd *cobra.Command, args []string) error { + if err := dbFillGapsCmdOpts.RequireE(); err != nil { + return err + } + if err := dbFillGapsCmdOpts.SetValues(); err != nil { + return err + } + + if len(args) != 0 && len(args) != 2 { + hlog.Errorf("Expected either 0 arguments or 2 but found %v arguments", len(args)) + return ErrUsage{cmd} + } + + var start, end uint64 + var withRange bool + if len(args) == 2 { + var err error + start, err = strconv.ParseUint(args[0], 10, 32) + if err != nil { + cmd.Usage() + return fmt.Errorf(`invalid sequence number "%s"`, args[0]) + } + end, err = strconv.ParseUint(args[1], 10, 32) + if err != nil { + cmd.Usage() + return fmt.Errorf(`invalid sequence number "%s"`, args[1]) + } + withRange = true + } + + var storageBackendConfig ingest.StorageBackendConfig + options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false} + if ledgerBackendType == ingest.BufferedStorageBackend { + cfg, err := toml.LoadFile(storageBackendConfigPath) + if err != nil { + return fmt.Errorf("failed to load config file %v: %w", storageBackendConfigPath, err) + } + if err = cfg.Unmarshal(&storageBackendConfig); err != nil { + return fmt.Errorf("error unmarshalling TOML config: %w", err) + } + storageBackendConfig.BufferedStorageBackendFactory = ledgerbackend.NewBufferedStorageBackend + storageBackendConfig.DataStoreFactory = datastore.NewDataStore + options.NoCaptiveCore = true + } + + err := horizon.ApplyFlags(horizonConfig, horizonFlags, options) + if err != nil { + return err + } + var gaps []history.LedgerRange + if withRange { + gaps, err = runDBDetectGapsInRange(*horizonConfig, uint32(start), uint32(end)) + if err != nil { + return err + } + hlog.Infof("found gaps %v within range [%v, %v]", gaps, start, end) + } else { + gaps, err = runDBDetectGaps(*horizonConfig) + if err != nil { + return err + } + hlog.Infof("found gaps %v", gaps) + } + + return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, *horizonConfig, storageBackendConfig) + }, + } + + dbDetectGapsCmd = &cobra.Command{ + Use: "detect-gaps", + Short: "detects ingestion gaps in Horizon's database", + Long: "detects ingestion gaps in Horizon's database and prints a list of reingest commands needed to fill the gaps", + RunE: func(cmd *cobra.Command, args []string) error { + if err := requireAndSetFlags(horizonFlags, horizon.DatabaseURLFlagName); err != nil { + return err + } + + if len(args) != 0 { + return ErrUsage{cmd} + } + gaps, err := runDBDetectGaps(*horizonConfig) + if err != nil { + return err + } + if len(gaps) == 0 { + hlog.Info("No gaps found") + return nil + } + fmt.Println("Horizon commands to run in order to fill in the gaps:") + cmdname := os.Args[0] + for _, g := range gaps { + fmt.Printf("%s db reingest range %d %d\n", cmdname, g.StartSequence, g.EndSequence) + } + return nil + }, + } + if err := dbReingestRangeCmdOpts.Init(dbReingestRangeCmd); err != nil { log.Fatal(err.Error()) } @@ -618,7 +632,7 @@ func init() { viper.BindPFlags(dbReingestRangeCmd.PersistentFlags()) viper.BindPFlags(dbFillGapsCmd.PersistentFlags()) - RootCmd.AddCommand(dbCmd) + rootCmd.AddCommand(dbCmd) dbCmd.AddCommand( dbInitCmd, dbMigrateCmd, @@ -635,3 +649,7 @@ func init() { ) dbReingestCmd.AddCommand(dbReingestRangeCmd) } + +func init() { + DefineDBCommands(RootCmd, globalConfig, globalFlags) +} diff --git a/services/horizon/cmd/db_test.go b/services/horizon/cmd/db_test.go index dea3e3777f..d3fbcaf345 100644 --- a/services/horizon/cmd/db_test.go +++ b/services/horizon/cmd/db_test.go @@ -3,6 +3,7 @@ package cmd import ( "testing" + "github.com/spf13/cobra" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -19,21 +20,8 @@ func TestDBCommandsTestSuite(t *testing.T) { type DBCommandsTestSuite struct { suite.Suite - dsn string -} - -func (s *DBCommandsTestSuite) SetupTest() { - resetFlags() -} - -func resetFlags() { - RootCmd.ResetFlags() - dbFillGapsCmd.ResetFlags() - dbReingestRangeCmd.ResetFlags() - - globalFlags.Init(RootCmd) - dbFillGapsCmdOpts.Init(dbFillGapsCmd) - dbReingestRangeCmdOpts.Init(dbReingestRangeCmd) + db *dbtest.DB + rootCmd *cobra.Command } func (s *DBCommandsTestSuite) SetupSuite() { @@ -42,18 +30,25 @@ func (s *DBCommandsTestSuite) SetupSuite() { return nil } - newDB := dbtest.Postgres(s.T()) - s.dsn = newDB.DSN + s.db = dbtest.Postgres(s.T()) RootCmd.SetArgs([]string{ - "db", "migrate", "up", "--db-url", s.dsn}) + "db", "migrate", "up", "--db-url", s.db.DSN}) require.NoError(s.T(), RootCmd.Execute()) } +func (s *DBCommandsTestSuite) TearDownSuite() { + s.db.Close() +} + +func (s *DBCommandsTestSuite) BeforeTest(suiteName string, testName string) { + s.rootCmd = NewRootCmd() +} + func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForBufferedBackend() { - RootCmd.SetArgs([]string{ + s.rootCmd.SetArgs([]string{ "db", "reingest", "range", - "--db-url", s.dsn, + "--db-url", s.db.DSN, "--network", "testnet", "--parallel-workers", "2", "--ledgerbackend", "datastore", @@ -61,14 +56,14 @@ func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForBufferedBackend() { "2", "10"}) - require.NoError(s.T(), dbReingestRangeCmd.Execute()) + require.NoError(s.T(), s.rootCmd.Execute()) require.Equal(s.T(), parallelJobSize, uint32(100)) } func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForCaptiveBackend() { - RootCmd.SetArgs([]string{ + s.rootCmd.SetArgs([]string{ "db", "reingest", "range", - "--db-url", s.dsn, + "--db-url", s.db.DSN, "--network", "testnet", "--stellar-core-binary-path", "/test/core/bin/path", "--parallel-workers", "2", @@ -76,14 +71,14 @@ func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForCaptiveBackend() { "2", "10"}) - require.NoError(s.T(), RootCmd.Execute()) + require.NoError(s.T(), s.rootCmd.Execute()) require.Equal(s.T(), parallelJobSize, uint32(100_000)) } func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForCaptive() { - RootCmd.SetArgs([]string{ + s.rootCmd.SetArgs([]string{ "db", "reingest", "range", - "--db-url", s.dsn, + "--db-url", s.db.DSN, "--network", "testnet", "--stellar-core-binary-path", "/test/core/bin/path", "--parallel-workers", "2", @@ -92,14 +87,14 @@ func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForCaptive() { "2", "10"}) - require.NoError(s.T(), RootCmd.Execute()) + require.NoError(s.T(), s.rootCmd.Execute()) require.Equal(s.T(), parallelJobSize, uint32(5)) } func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForBuffered() { - RootCmd.SetArgs([]string{ + s.rootCmd.SetArgs([]string{ "db", "reingest", "range", - "--db-url", s.dsn, + "--db-url", s.db.DSN, "--network", "testnet", "--parallel-workers", "2", "--parallel-job-size", "5", @@ -108,7 +103,7 @@ func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForBuffered() { "2", "10"}) - require.NoError(s.T(), RootCmd.Execute()) + require.NoError(s.T(), s.rootCmd.Execute()) require.Equal(s.T(), parallelJobSize, uint32(5)) } @@ -249,21 +244,21 @@ func (s *DBCommandsTestSuite) TestDbReingestAndFillGapsCmds() { for _, command := range commands { for _, tt := range tests { s.T().Run(tt.name+"_"+command.name, func(t *testing.T) { - resetFlags() + rootCmd := NewRootCmd() var args []string args = append(command.cmd, tt.args...) - RootCmd.SetArgs(append([]string{ - "--db-url", s.dsn, + rootCmd.SetArgs(append([]string{ + "--db-url", s.db.DSN, "--stellar-core-binary-path", "/test/core/bin/path", }, args...)) if tt.expectError { - err := RootCmd.Execute() + err := rootCmd.Execute() require.Error(t, err) require.Contains(t, err.Error(), tt.errorMessage) } else { - require.NoError(t, RootCmd.Execute()) + require.NoError(t, rootCmd.Execute()) } }) } diff --git a/services/horizon/cmd/root.go b/services/horizon/cmd/root.go index d2900496d4..099979c97b 100644 --- a/services/horizon/cmd/root.go +++ b/services/horizon/cmd/root.go @@ -12,7 +12,13 @@ import ( var ( globalConfig, globalFlags = horizon.Flags() - RootCmd = &cobra.Command{ + RootCmd = createRootCmd(globalConfig, globalFlags) + originalHelpFunc = RootCmd.HelpFunc() + originalUsageFunc = RootCmd.UsageFunc() +) + +func createRootCmd(horizonConfig *horizon.Config, configOptions config.ConfigOptions) *cobra.Command { + return &cobra.Command{ Use: "horizon", Short: "client-facing api server for the Stellar network", SilenceErrors: true, @@ -23,16 +29,44 @@ var ( "DEPRECATED - the use of command-line flags has been deprecated in favor of environment variables. Please" + "consult our Configuring section in the developer documentation on how to use them - https://developers.stellar.org/docs/run-api-server/configuring", RunE: func(cmd *cobra.Command, args []string) error { - app, err := horizon.NewAppFromFlags(globalConfig, globalFlags) + app, err := horizon.NewAppFromFlags(horizonConfig, configOptions) if err != nil { return err } return app.Serve() }, } - originalHelpFunc = RootCmd.HelpFunc() - originalUsageFunc = RootCmd.UsageFunc() -) +} + +func initRootCmd(cmd *cobra.Command, + originalHelpFn func(*cobra.Command, []string), + originalUsageFn func(*cobra.Command) error, + horizonGlobalFlags config.ConfigOptions) { + // override the default help output, apply further filtering on which global flags + // will be shown on the help outout dependent on the command help was issued upon. + cmd.SetHelpFunc(func(c *cobra.Command, args []string) { + enableGlobalOptionsInHelp(c, horizonGlobalFlags) + originalHelpFn(c, args) + }) + + cmd.SetUsageFunc(func(c *cobra.Command) error { + enableGlobalOptionsInHelp(c, horizonGlobalFlags) + return originalUsageFn(c) + }) + + err := horizonGlobalFlags.Init(cmd) + if err != nil { + stdLog.Fatal(err.Error()) + } +} + +func NewRootCmd() *cobra.Command { + horizonGlobalConfig, horizonGlobalFlags := horizon.Flags() + cmd := createRootCmd(horizonGlobalConfig, horizonGlobalFlags) + initRootCmd(cmd, cmd.HelpFunc(), cmd.UsageFunc(), horizonGlobalFlags) + DefineDBCommands(cmd, horizonGlobalConfig, horizonGlobalFlags) + return cmd +} // ErrUsage indicates we should print the usage string and exit with code 1 type ErrUsage struct { @@ -51,23 +85,7 @@ func (e ErrExitCode) Error() string { } func init() { - - // override the default help output, apply further filtering on which global flags - // will be shown on the help outout dependent on the command help was issued upon. - RootCmd.SetHelpFunc(func(c *cobra.Command, args []string) { - enableGlobalOptionsInHelp(c, globalFlags) - originalHelpFunc(c, args) - }) - - RootCmd.SetUsageFunc(func(c *cobra.Command) error { - enableGlobalOptionsInHelp(c, globalFlags) - return originalUsageFunc(c) - }) - - err := globalFlags.Init(RootCmd) - if err != nil { - stdLog.Fatal(err.Error()) - } + initRootCmd(RootCmd, originalHelpFunc, originalUsageFunc, globalFlags) } func Execute() error { diff --git a/services/horizon/internal/integration/db_test.go b/services/horizon/internal/integration/db_test.go index 1f1d2277ec..86a86a8055 100644 --- a/services/horizon/internal/integration/db_test.go +++ b/services/horizon/internal/integration/db_test.go @@ -3,16 +3,24 @@ package integration import ( "context" "fmt" + "net" + "os" "path/filepath" "strconv" "testing" "time" + "github.com/fsouza/fake-gcs-server/fakestorage" + cp "github.com/otiai10/copy" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stellar/go/clients/horizonclient" + sdk "github.com/stellar/go/clients/horizonclient" "github.com/stellar/go/historyarchive" "github.com/stellar/go/keypair" + "github.com/stellar/go/network" hProtocol "github.com/stellar/go/protocols/horizon" horizoncmd "github.com/stellar/go/services/horizon/cmd" horizon "github.com/stellar/go/services/horizon/internal" @@ -25,6 +33,7 @@ import ( "github.com/stellar/go/support/db/dbtest" "github.com/stellar/go/txnbuild" "github.com/stellar/go/xdr" + "github.com/stellar/throttled" ) func submitLiquidityPoolOps(itest *integration.Test, tt *assert.Assertions) (submittedOperations []txnbuild.Operation, lastLedger int32) { @@ -485,7 +494,8 @@ func TestReingestDB(t *testing.T) { horizonConfig := itest.GetHorizonIngestConfig() t.Run("validate parallel range", func(t *testing.T) { - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, + var rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "reingest", "range", @@ -494,7 +504,7 @@ func TestReingestDB(t *testing.T) { "2", )) - assert.EqualError(t, horizoncmd.RootCmd.Execute(), "Invalid range: {10 2} from > to") + assert.EqualError(t, rootCmd.Execute(), "Invalid range: {10 2} from > to") }) t.Logf("reached ledger is %v", reachedLedger) @@ -537,7 +547,8 @@ func TestReingestDB(t *testing.T) { "captive-core-reingest-range-integration-tests.cfg", ) - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, "db", + var rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "reingest", "range", "--parallel-workers=1", @@ -545,8 +556,135 @@ func TestReingestDB(t *testing.T) { fmt.Sprintf("%d", toLedger), )) - tt.NoError(horizoncmd.RootCmd.Execute()) - tt.NoError(horizoncmd.RootCmd.Execute(), "Repeat the same reingest range against db, should not have errors.") + tt.NoError(rootCmd.Execute()) + tt.NoError(rootCmd.Execute(), "Repeat the same reingest range against db, should not have errors.") +} + +func TestReingestDatastore(t *testing.T) { + if os.Getenv("HORIZON_INTEGRATION_TESTS_ENABLED") == "" { + t.Skip("skipping integration test: HORIZON_INTEGRATION_TESTS_ENABLED not set") + } + + newDB := dbtest.Postgres(t) + defer newDB.Close() + var rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs([]string{ + "db", "migrate", "up", "--db-url", newDB.DSN}) + require.NoError(t, rootCmd.Execute()) + + testTempDir := t.TempDir() + tempSeedDataPath := filepath.Join(testTempDir, "data") + tempSeedBucketPath := filepath.Join(tempSeedDataPath, "path", "to", "my", "bucket") + tempSeedBucketFolder := filepath.Join(tempSeedBucketPath, "FFFFFFFF--0-63999") + if err := os.MkdirAll(tempSeedBucketFolder, 0777); err != nil { + t.Fatalf("unable to create seed data in temp path, %v", err) + } + + err := cp.Copy("./testdata/testbucket", tempSeedBucketFolder) + if err != nil { + t.Fatalf("unable to copy seed data files for fake gcs, %v", err) + } + + testWriter := &testWriter{test: t} + opts := fakestorage.Options{ + Scheme: "http", + Host: "127.0.0.1", + Port: uint16(0), + Writer: testWriter, + Seed: tempSeedDataPath, + StorageRoot: filepath.Join(testTempDir, "bucket"), + PublicHost: "127.0.0.1", + } + + gcsServer, err := fakestorage.NewServerWithOptions(opts) + + if err != nil { + t.Fatalf("couldn't start the fake gcs http server %v", err) + } + + defer gcsServer.Stop() + t.Logf("fake gcs server started at %v", gcsServer.URL()) + t.Setenv("STORAGE_EMULATOR_HOST", gcsServer.URL()) + + rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs([]string{"db", + "reingest", + "range", + "--db-url", newDB.DSN, + "--network", "testnet", + "--parallel-workers", "1", + "--ledgerbackend", "datastore", + "--datastore-config", "../../config.storagebackend.toml", + "997", + "999"}) + + require.NoError(t, rootCmd.Execute()) + + listener, webApp, webPort, err := dynamicHorizonWeb(newDB.DSN) + if err != nil { + t.Fatalf("couldn't create and start horizon web app on dynamic port %v", err) + } + + webAppDone := make(chan struct{}) + go func() { + defer close(webAppDone) + if err = listener.Close(); err != nil { + return + } + webApp.Serve() + }() + + defer func() { + webApp.Close() + select { + case <-webAppDone: + return + default: + } + }() + + horizonClient := &sdk.Client{ + HorizonURL: fmt.Sprintf("http://localhost:%v", webPort), + } + + // wait until the web server is up before continuing to test requests + require.Eventually(t, func() bool { + if _, horizonErr := horizonClient.Root(); horizonErr != nil { + return false + } + return true + }, time.Second*15, time.Millisecond*100) + + _, err = horizonClient.LedgerDetail(998) + require.NoError(t, err) +} + +func dynamicHorizonWeb(dsn string) (net.Listener, *horizon.App, uint, error) { + listener, err := net.Listen("tcp", "localhost:0") + if err != nil { + return nil, nil, 0, err + } + webPort := uint(listener.Addr().(*net.TCPAddr).Port) + + webApp, err := horizon.NewApp(horizon.Config{ + DatabaseURL: dsn, + Port: webPort, + NetworkPassphrase: network.TestNetworkPassphrase, + LogLevel: logrus.InfoLevel, + DisableTxSub: true, + Ingest: false, + ConnectionTimeout: 10 * time.Second, + RateQuota: &throttled.RateQuota{ + MaxRate: throttled.PerHour(1000), + MaxBurst: 100, + }, + }) + if err != nil { + listener.Close() + return nil, nil, 0, err + } + + return listener, webApp, webPort, nil } func TestReingestDBWithFilterRules(t *testing.T) { @@ -648,22 +786,24 @@ func TestReingestDBWithFilterRules(t *testing.T) { itest.StopHorizon() // clear the db with reaping all ledgers - horizoncmd.RootCmd.SetArgs(command(t, itest.GetHorizonIngestConfig(), "db", + var rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, itest.GetHorizonIngestConfig(), "db", "reap", "--history-retention-count=1", )) - tt.NoError(horizoncmd.RootCmd.Execute()) + tt.NoError(rootCmd.Execute()) // repopulate the db with reingestion which should catchup using core reapply filter rules // correctly on reingestion ranged - horizoncmd.RootCmd.SetArgs(command(t, itest.GetHorizonIngestConfig(), "db", + rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, itest.GetHorizonIngestConfig(), "db", "reingest", "range", "1", fmt.Sprintf("%d", reachedLedger), )) - tt.NoError(horizoncmd.RootCmd.Execute()) + tt.NoError(rootCmd.Execute()) // bring up horizon, just the api server no ingestion, to query // for tx's that should have been repopulated on db from reingestion per @@ -733,12 +873,13 @@ func TestMigrateIngestIsTrueByDefault(t *testing.T) { newDB := dbtest.Postgres(t) freshHorizonPostgresURL := newDB.DSN - horizoncmd.RootCmd.SetArgs([]string{ + rootCmd := horizoncmd.NewRootCmd() + rootCmd.SetArgs([]string{ // ingest is set to true by default "--db-url", freshHorizonPostgresURL, "db", "migrate", "up", }) - tt.NoError(horizoncmd.RootCmd.Execute()) + tt.NoError(rootCmd.Execute()) dbConn, err := db.Open("postgres", freshHorizonPostgresURL) tt.NoError(err) @@ -754,12 +895,13 @@ func TestMigrateChecksIngestFlag(t *testing.T) { newDB := dbtest.Postgres(t) freshHorizonPostgresURL := newDB.DSN - horizoncmd.RootCmd.SetArgs([]string{ + rootCmd := horizoncmd.NewRootCmd() + rootCmd.SetArgs([]string{ "--ingest=false", "--db-url", freshHorizonPostgresURL, "db", "migrate", "up", }) - tt.NoError(horizoncmd.RootCmd.Execute()) + tt.NoError(rootCmd.Execute()) dbConn, err := db.Open("postgres", freshHorizonPostgresURL) tt.NoError(err) @@ -802,7 +944,8 @@ func TestFillGaps(t *testing.T) { tt.NoError(err) t.Run("validate parallel range", func(t *testing.T) { - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, + var rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "--parallel-workers=2", @@ -810,7 +953,7 @@ func TestFillGaps(t *testing.T) { "2", )) - assert.EqualError(t, horizoncmd.RootCmd.Execute(), "Invalid range: {10 2} from > to") + assert.EqualError(t, rootCmd.Execute(), "Invalid range: {10 2} from > to") }) // make sure a full checkpoint has elapsed otherwise there will be nothing to reingest @@ -842,21 +985,25 @@ func TestFillGaps(t *testing.T) { filepath.Dir(horizonConfig.CaptiveCoreConfigPath), "captive-core-reingest-range-integration-tests.cfg", ) - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "--parallel-workers=1")) - tt.NoError(horizoncmd.RootCmd.Execute()) + + rootCmd := horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "--parallel-workers=1")) + tt.NoError(rootCmd.Execute()) tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger)) tt.Equal(int64(0), latestLedger) - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "3", "4")) - tt.NoError(horizoncmd.RootCmd.Execute()) + rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "3", "4")) + tt.NoError(rootCmd.Execute()) tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger)) tt.NoError(historyQ.ElderLedger(context.Background(), &oldestLedger)) tt.Equal(int64(3), oldestLedger) tt.Equal(int64(4), latestLedger) - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "6", "7")) - tt.NoError(horizoncmd.RootCmd.Execute()) + rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "6", "7")) + tt.NoError(rootCmd.Execute()) tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger)) tt.NoError(historyQ.ElderLedger(context.Background(), &oldestLedger)) tt.Equal(int64(3), oldestLedger) @@ -866,8 +1013,9 @@ func TestFillGaps(t *testing.T) { tt.NoError(err) tt.Equal([]history.LedgerRange{{StartSequence: 5, EndSequence: 5}}, gaps) - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps")) - tt.NoError(horizoncmd.RootCmd.Execute()) + rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps")) + tt.NoError(rootCmd.Execute()) tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger)) tt.NoError(historyQ.ElderLedger(context.Background(), &oldestLedger)) tt.Equal(int64(3), oldestLedger) @@ -876,8 +1024,9 @@ func TestFillGaps(t *testing.T) { tt.NoError(err) tt.Empty(gaps) - horizoncmd.RootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "2", "8")) - tt.NoError(horizoncmd.RootCmd.Execute()) + rootCmd = horizoncmd.NewRootCmd() + rootCmd.SetArgs(command(t, horizonConfig, "db", "fill-gaps", "2", "8")) + tt.NoError(rootCmd.Execute()) tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger)) tt.NoError(historyQ.ElderLedger(context.Background(), &oldestLedger)) tt.Equal(int64(2), oldestLedger) @@ -905,3 +1054,12 @@ func TestResumeFromInitializedDB(t *testing.T) { tt.Eventually(successfullyResumed, 1*time.Minute, 1*time.Second) } + +type testWriter struct { + test *testing.T +} + +func (w *testWriter) Write(p []byte) (n int, err error) { + w.test.Log(string(p)) + return len(p), nil +} diff --git a/services/horizon/internal/integration/testdata/testbucket/FFFFFC18--999.xdr.zstd b/services/horizon/internal/integration/testdata/testbucket/FFFFFC18--999.xdr.zstd new file mode 100644 index 0000000000..b2627e7fc1 Binary files /dev/null and b/services/horizon/internal/integration/testdata/testbucket/FFFFFC18--999.xdr.zstd differ diff --git a/services/horizon/internal/integration/testdata/testbucket/FFFFFC19--998.xdr.zstd b/services/horizon/internal/integration/testdata/testbucket/FFFFFC19--998.xdr.zstd new file mode 100644 index 0000000000..01fb99ae1f Binary files /dev/null and b/services/horizon/internal/integration/testdata/testbucket/FFFFFC19--998.xdr.zstd differ diff --git a/services/horizon/internal/integration/testdata/testbucket/FFFFFC1A--997.xdr.zstd b/services/horizon/internal/integration/testdata/testbucket/FFFFFC1A--997.xdr.zstd new file mode 100644 index 0000000000..9a509579c7 Binary files /dev/null and b/services/horizon/internal/integration/testdata/testbucket/FFFFFC1A--997.xdr.zstd differ