Skip to content

Commit

Permalink
Reader desposal made awaitable and properly handled in the FlusherTask
Browse files Browse the repository at this point in the history
  • Loading branch information
Scooletz committed Nov 28, 2024
1 parent d89e30a commit aeb0f13
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/Paprika.Tests/ReadForbiddingDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class ReadForbiddingDb(IDb db) : IDb
public IReadOnlyBatch BeginReadOnlyBatchOrLatest(in Keccak stateHash, string name = "") =>
new ReadOnlyBatch(db.BeginReadOnlyBatchOrLatest(in stateHash, name), this);

public IReadOnlyBatch[] SnapshotAll() => db.SnapshotAll();
public IReadOnlyBatch[] SnapshotAll(bool withoutOldest = false) => db.SnapshotAll(withoutOldest);

public bool HasState(in Keccak keccak) => db.HasState(in keccak);
public int HistoryDepth => db.HistoryDepth;
Expand Down
75 changes: 75 additions & 0 deletions src/Paprika.Tests/Store/MultiHeadChainTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,79 @@ void Assert()
}
}
}

[Test]
public async Task Old_reader_kept_alive_keeps_finalization_from_processing()
{
const byte blocks = 64;

using var db = PagedDb.NativeMemoryDb(1 * Mb, 2);

var random = new Random(Seed);

await using var multi = db.OpenMultiHeadChain();

using var head = multi.Begin(Keccak.EmptyTreeHash);

var finalized = new List<Task>();

var account = Keccak.OfAnEmptySequenceRlp;

byte[] expectedReaderValue = [];
byte[] lastWrittenValue = [];
Keccak lastWrittenKeccak = default;

IHeadReader? reader = null;
for (byte i = 0; i < blocks; i++)
{
lastWrittenKeccak = random.NextKeccak();

lastWrittenValue = [i];
head.SetRaw(Key.Account(account), lastWrittenValue);
head.Commit((uint)(i + 1), lastWrittenKeccak);

if (i == 0)
{
expectedReaderValue = lastWrittenValue;

// the first block should be set and finalized
await multi.Finalize(lastWrittenKeccak);
multi.TryLeaseReader(lastWrittenKeccak, out reader).Should().BeTrue();
}
else
{
// Just register finalization, it won't be finalized as the reader will keep it from going on
finalized.Add(multi.Finalize(lastWrittenKeccak));
}
}

AssertReader(reader, account, expectedReaderValue);

finalized.Should().AllSatisfy(t => t.Status.Should().Be(TaskStatus.WaitingForActivation));

reader!.Dispose();
await reader.CleanedUp;
await Task.WhenAll(finalized);

// Everything is finalized and written, try read the last value now
AssertLastWrittenValue(multi, lastWrittenKeccak, account, lastWrittenValue);

return;

static void AssertReader(IHeadReader? reader, Keccak account, byte[] expectedReaderValue)
{
reader.Should().NotBeNull();

reader!.TryGet(Key.Account(account), out var actualReaderValue).Should().BeTrue();
actualReaderValue.SequenceEqual(expectedReaderValue).Should().BeTrue();
}

static void AssertLastWrittenValue(IMultiHeadChain multi, Keccak stateHash, Keccak account, byte[] lastWrittenValue)
{
multi.TryLeaseReader(stateHash, out var lastReader).Should().BeTrue();
lastReader.TryGet(Key.Account(account), out var actualLastWrittenValue).Should().BeTrue();
actualLastWrittenValue.SequenceEqual(lastWrittenValue).Should().BeTrue();
lastReader.Dispose();
}
}
}
2 changes: 1 addition & 1 deletion src/Paprika/IDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface IDb
/// Performs a snapshot of all the valid roots in the database.
/// </summary>
/// <returns>An array of roots.</returns>
IReadOnlyBatch[] SnapshotAll();
IReadOnlyBatch[] SnapshotAll(bool withoutOldest = false);

/// <summary>
/// Whether there's a state with the given keccak.
Expand Down
45 changes: 31 additions & 14 deletions src/Paprika/Store/PagedDb.MultiHeadChain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ private class MultiHeadChain : IMultiHeadChain

// Readers
private readonly Dictionary<Keccak, Reader> _readers = new();
private readonly Queue<Reader> _readersDisposalQueue = new();
private readonly ReaderWriterLockSlim _readerLock = new();

// Proposed batches that are finalized
Expand Down Expand Up @@ -67,11 +68,19 @@ public MultiHeadChain(PagedDb db)

_pool = _db._pool;

foreach (var batch in _db.SnapshotAll())
// Snapshot all without the oldest, we want to keep only N-1 readers
var allWithoutTheOldest = _db.SnapshotAll(withoutOldest: true);

// Sort from oldest to youngest
Array.Sort(allWithoutTheOldest, (a, b) => a.BatchId.CompareTo(b.BatchId));

foreach (var batch in allWithoutTheOldest)
{
_lastCommittedBatch = Math.Max(batch.BatchId, _lastCommittedBatch);
var reader = new Reader(_db, CreateNextRoot([], (ReadOnlyBatch)batch), batch, [], _pool);
RegisterReader(reader);

_readersDisposalQueue.Enqueue(reader);
}

_flusher = FlusherTask();
Expand Down Expand Up @@ -263,7 +272,7 @@ private async Task FlusherTask()
await _db._manager.WritePages(batch.Changes, options);

// Set new root
var (previousRootStateHash, newRootPage) = _db.SetNewRoot(batch.Root);
var newRootPage = _db.SetNewRoot(batch.Root);

// report
_db.ReportDbSize(GetRootSizeInMb(batch.Root));
Expand All @@ -272,7 +281,7 @@ private async Task FlusherTask()

List<ProposedBatch> removed;

Reader newIReader;
Reader newReader;
lock (_db._batchLock)
{
_db.CommitNewRoot();
Expand All @@ -290,10 +299,11 @@ private async Task FlusherTask()

var read = BuildDependencies(batch.Root.Data.Metadata.StateHash, out var root, out _,
out var proposed);
newIReader = new Reader(_db, root, read, proposed, _pool);
newReader = new Reader(_db, root, read, proposed, _pool);
}

RegisterNewReaderAfterFinalization(removed, newIReader, previousRootStateHash);
// Register the new reader and await the disposal of the oldest one.
await RegisterNewReaderAfterFinalization(removed, newReader);

// Only now dispose the removed as they had their data used above.
foreach (var b in removed)
Expand All @@ -315,33 +325,34 @@ private async Task FlusherTask()
}
}

private void RegisterNewReaderAfterFinalization(List<ProposedBatch> removed, Reader newReader,
Keccak previousRootStateHash)
private Task RegisterNewReaderAfterFinalization(List<ProposedBatch> removed, Reader newReader)
{
var toDispose = new List<Reader>();
Reader? oldest = null;

// Update readers
_readerLock.EnterWriteLock();
try
{
Reader? headReader;

// Remove the previous
foreach (var b in removed)
{
if (_readers.Remove(b.StateHash, out headReader))
if (_readers.Remove(b.StateHash, out var headReader))
{
toDispose.Add(headReader);
}
}

// Register the new one
// Register the new one in the dictionary and in the disposal queue
_readers[newReader.Metadata.StateHash] = newReader;

// Remove the previous state root if exists
if (_readers.Remove(previousRootStateHash, out headReader))
_readersDisposalQueue.Enqueue(newReader);

if (_readersDisposalQueue.Count == _db._historyDepth)
{
toDispose.Add(headReader);
// Ensure that we keep only N-1 readers for the history, so that the next spin can copy over to the root.
oldest = _readersDisposalQueue.Dequeue();
_readers.Remove(oldest.Metadata.StateHash);
}
}
finally
Expand All @@ -354,6 +365,12 @@ private void RegisterNewReaderAfterFinalization(List<ProposedBatch> removed, Rea
{
reader.Dispose();
}

if (oldest == null)
return Task.CompletedTask;

oldest.Dispose();
return oldest.CleanedUp;
}

private ProposedBatch FindProposed(Keccak keccak)
Expand Down
24 changes: 12 additions & 12 deletions src/Paprika/Store/PagedDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,15 @@ public IReadOnlyBatch BeginReadOnlyBatchOrLatest(in Keccak stateHash, string nam
}
}

public IReadOnlyBatch[] SnapshotAll()
public IReadOnlyBatch[] SnapshotAll(bool withoutOldest = false)
{
var batches = new List<IReadOnlyBatch>();

var limit = withoutOldest ? _historyDepth - 1 : _historyDepth;

lock (_batchLock)
{
for (var back = 0; back < _historyDepth; back++)
for (var back = 0; back < limit; back++)
{
if (_lastRoot - back < 0)
{
Expand Down Expand Up @@ -395,7 +397,6 @@ private void DisposeReadOnlyBatch(ReadOnlyBatch batch)
{
_lowestReadTxBatch.Set((int)_batchesReadOnly.Min(b => b.BatchId));
}

}
}

Expand Down Expand Up @@ -425,7 +426,8 @@ private IBatch BuildFromRoot(RootPage current)

[DoesNotReturn]
[StackTraceHidden]
private static void ThrowOnlyOneBatch() => throw new Exception("There is another batch active at the moment. Commit the other first");
private static void ThrowOnlyOneBatch() =>
throw new Exception("There is another batch active at the moment. Commit the other first");

[DoesNotReturn]
[StackTraceHidden]
Expand Down Expand Up @@ -504,20 +506,17 @@ private static RootPage CreateNextRoot(RootPage current, BufferPool pool)
/// <summary>
/// Sets the new root but does not bump the _lastRoot that should be done in a lock.
/// </summary>
private (Keccak previousRootStateHash, DbAddress newRootPage) SetNewRoot(RootPage root)
private DbAddress SetNewRoot(RootPage root)
{
var pageAddress = (_lastRoot + 1) % _historyDepth;
var destination = _roots[pageAddress];

var previousStateHash = destination.Data.Metadata.StateHash;

root.CopyTo(destination);
return (previousStateHash, DbAddress.Page((uint)pageAddress));

return DbAddress.Page((uint)pageAddress);
}

private void CommitNewRoot() => _lastRoot += 1;


private sealed class ReadOnlyBatch(PagedDb db, RootPage root, string name)
: IVisitableReadOnlyBatch, IReadOnlyBatchContext
{
Expand Down Expand Up @@ -649,7 +648,7 @@ public async ValueTask Commit(CommitOptions options)

await Db._manager.WritePages(Written, options);

var (_, newRootPage) = Db.SetNewRoot(Root);
var newRootPage = Db.SetNewRoot(Root);

// report
Db.ReportDbSize(GetRootSizeInMb(Root));
Expand Down Expand Up @@ -799,7 +798,8 @@ public override Page GetNewPage(out DbAddress addr, bool clear)

// on failure to reuse a page, default to allocating a new one.
addr = Root.Data.GetNextFreePage();
Debug.Assert(Db._manager.IsValidAddress(addr), $"The address of the next free page {addr} breaches the size of the database.");
Debug.Assert(Db._manager.IsValidAddress(addr),
$"The address of the next free page {addr} breaches the size of the database.");
}

var page = GetAtForWriting(addr);
Expand Down
14 changes: 14 additions & 0 deletions src/Paprika/Utils/RefCountingDisposable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ public abstract class RefCountingDisposable : IRefCountingDisposable
private const int NoAccessors = 0;
private const int Disposing = -1;

// Always run continuations asynchronously.
private readonly TaskCompletionSource _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);

/// <summary>
/// A task that is completed once this is cleaned up using <see cref="CleanUp"/>.
/// </summary>
public Task CleanedUp => _tcs.Task;

private PaddedValue _leases;

protected RefCountingDisposable(int initialCount = Single)
Expand Down Expand Up @@ -111,6 +119,7 @@ private void ReleaseLeaseOnce()
{
// set to disposed by this Release
CleanUp();
_tcs.SetResult();
}

[DoesNotReturn]
Expand Down Expand Up @@ -140,4 +149,9 @@ private struct PaddedValue
public interface IRefCountingDisposable : IDisposable
{
void AcquireLease();

/// <summary>
/// A task that is completed once this is cleaned up.
/// </summary>
public Task CleanedUp { get; }
}

0 comments on commit aeb0f13

Please sign in to comment.