Skip to content

Commit

Permalink
flushing events
Browse files Browse the repository at this point in the history
  • Loading branch information
Scooletz committed Nov 29, 2024
1 parent f099f27 commit 717ccd6
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 15 deletions.
20 changes: 10 additions & 10 deletions src/Paprika.Cli/VerifyWholeTreeSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ public class Command : Command<VerifyWholeTreeSettings>
{
public override int Execute(CommandContext context, VerifyWholeTreeSettings settings)
{
using var db = settings.BuildDb();

using var read = db.BeginReadOnlyBatch();
using var latest = Blockchain.StartReadOnlyLatestFromDb(db);

AnsiConsole.WriteLine("Checking whole tree...");

var keccak = new ComputeMerkleBehavior().CalculateStateRootHash(latest);

AnsiConsole.WriteLine($"Keccak {keccak.ToString()}");
// using var db = settings.BuildDb();
//
// using var read = db.BeginReadOnlyBatch();
// using var latest = Blockchain.StartReadOnlyLatestFromDb(db);
//
// AnsiConsole.WriteLine("Checking whole tree...");
//
// var keccak = new ComputeMerkleBehavior().CalculateStateRootHash(read);
//
// AnsiConsole.WriteLine($"Keccak {keccak.ToString()}");


return 0;
Expand Down
14 changes: 11 additions & 3 deletions src/Paprika/Chain/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,20 @@ public void VerifyDbIntegrityOnCommit()
/// <summary>
/// Announces the last block number that was flushed to disk.
/// </summary>
public event EventHandler<(uint blockNumber, Keccak blockHash)> Flushed;
public event EventHandler<(uint blockNumber, Keccak blockHash)> Flushed
{
add => _chain.Flushed += value;
remove => _chain.Flushed -= value;
}

/// <summary>
/// The flusher failed.
/// </summary>
public event EventHandler<Exception> FlusherFailure;
public event EventHandler<Exception> FlusherFailure
{
add => _chain.FlusherFailure += value;
remove => _chain.FlusherFailure -= value;
}

public IWorldState StartNew(Keccak parentKeccak) => new BlockState(_chain.Begin(parentKeccak), this);

Expand Down Expand Up @@ -1233,4 +1241,4 @@ private static Span<byte> ReadStorage(ReadOnlySpan<byte> data, Span<byte> destin
data.CopyTo(destination);
return destination.Slice(0, data.Length);
}
}
}
2 changes: 1 addition & 1 deletion src/Paprika/Merkle/ComputeMerkleBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public ComputeMerkleBehavior(int maxDegreeOfParallelism = ParallelismUnlimited)

/// <summary>
/// Calculates state root hash, passing through all the account and storage tries and building a new value
/// that is not based on any earlier calculation. It's time consuming.
/// that is not based on any earlier calculation. It's time-consuming.
/// </summary>
public Keccak CalculateStateRootHash(IReadOnlyCommit commit)
{
Expand Down
35 changes: 34 additions & 1 deletion src/Paprika/Store/PagedDb.MultiHeadChain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ private class MultiHeadChain : IMultiHeadChain
// Metrics
private readonly MetricsExtensions.IAtomicIntGauge _flusherQueueCount;

/// <summary>
/// Announces the last block number that was flushed to disk.
/// </summary>
public event EventHandler<(uint blockNumber, Keccak blockHash)> Flushed;

/// <summary>
/// The flusher failed.
/// </summary>
public event EventHandler<Exception> FlusherFailure;

public MultiHeadChain(PagedDb db)
{
_db = db;
Expand Down Expand Up @@ -277,7 +287,8 @@ private async Task FlusherTask()
{
try
{
const CommitOptions options = CommitOptions.FlushDataOnly;
// No flushing for batches atm.
const CommitOptions options = CommitOptions.DangerNoFlush;

Debug.Assert(toFinalize.batches[0].BatchId == _lastCommittedBatch + 1);

Expand Down Expand Up @@ -332,14 +343,26 @@ private async Task FlusherTask()
_flusherQueueCount.Set(reader.Count);
}

if (reader.TryPeek(out _) == false)
{
// For now, perform sync only if there is no other coming.
// TODO: This should take into consideration the actual number of blocks written so far
_db.Flush();
}

Flushed?.Invoke(this, new ValueTuple<uint, Keccak>());
toFinalize.tcs.SetResult();
}
catch (Exception e)
{
toFinalize.tcs.SetException(e);
FlusherFailure?.Invoke(this, e);
}
}
}

// Sync before closing
_db.Flush();
}

private Task RegisterNewReaderAfterFinalization(List<ProposedBatch> removed, Reader newReader)
Expand Down Expand Up @@ -876,4 +899,14 @@ public interface IMultiHeadChain : IAsyncDisposable
Task Finalize(Keccak keccak);

bool HasState(in Keccak keccak);

/// <summary>
/// Announces the last block number that was flushed to disk.
/// </summary>
event EventHandler<(uint blockNumber, Keccak blockHash)> Flushed;

/// <summary>
/// The flusher failed.
/// </summary>
event EventHandler<Exception> FlusherFailure;
}

0 comments on commit 717ccd6

Please sign in to comment.