diff --git a/src/Paprika.Tests/ReadForbiddingDb.cs b/src/Paprika.Tests/ReadForbiddingDb.cs index 8a319a8d..e2a6f6f1 100644 --- a/src/Paprika.Tests/ReadForbiddingDb.cs +++ b/src/Paprika.Tests/ReadForbiddingDb.cs @@ -16,7 +16,7 @@ public class ReadForbiddingDb(IDb db) : IDb public IReadOnlyBatch BeginReadOnlyBatchOrLatest(in Keccak stateHash, string name = "") => new ReadOnlyBatch(db.BeginReadOnlyBatchOrLatest(in stateHash, name), this); - public IReadOnlyBatch[] SnapshotAll() => db.SnapshotAll(); + public IReadOnlyBatch[] SnapshotAll(bool withoutOldest = false) => db.SnapshotAll(withoutOldest); public bool HasState(in Keccak keccak) => db.HasState(in keccak); public int HistoryDepth => db.HistoryDepth; diff --git a/src/Paprika.Tests/Store/MultiHeadChainTests.cs b/src/Paprika.Tests/Store/MultiHeadChainTests.cs index bee90a6c..15bbeeb8 100644 --- a/src/Paprika.Tests/Store/MultiHeadChainTests.cs +++ b/src/Paprika.Tests/Store/MultiHeadChainTests.cs @@ -173,4 +173,79 @@ void Assert() } } } + + [Test] + public async Task Old_reader_kept_alive_keeps_finalization_from_processing() + { + const byte blocks = 64; + + using var db = PagedDb.NativeMemoryDb(1 * Mb, 2); + + var random = new Random(Seed); + + await using var multi = db.OpenMultiHeadChain(); + + using var head = multi.Begin(Keccak.EmptyTreeHash); + + var finalized = new List(); + + var account = Keccak.OfAnEmptySequenceRlp; + + byte[] expectedReaderValue = []; + byte[] lastWrittenValue = []; + Keccak lastWrittenKeccak = default; + + IHeadReader? reader = null; + for (byte i = 0; i < blocks; i++) + { + lastWrittenKeccak = random.NextKeccak(); + + lastWrittenValue = [i]; + head.SetRaw(Key.Account(account), lastWrittenValue); + head.Commit((uint)(i + 1), lastWrittenKeccak); + + if (i == 0) + { + expectedReaderValue = lastWrittenValue; + + // the first block should be set and finalized + await multi.Finalize(lastWrittenKeccak); + multi.TryLeaseReader(lastWrittenKeccak, out reader).Should().BeTrue(); + } + else + { + // Just register finalization, it won't be finalized as the reader will keep it from going on + finalized.Add(multi.Finalize(lastWrittenKeccak)); + } + } + + AssertReader(reader, account, expectedReaderValue); + + finalized.Should().AllSatisfy(t => t.Status.Should().Be(TaskStatus.WaitingForActivation)); + + reader!.Dispose(); + await reader.CleanedUp; + await Task.WhenAll(finalized); + + // Everything is finalized and written, try read the last value now + AssertLastWrittenValue(multi, lastWrittenKeccak, account, lastWrittenValue); + + return; + + static void AssertReader(IHeadReader? reader, Keccak account, byte[] expectedReaderValue) + { + reader.Should().NotBeNull(); + + reader!.TryGet(Key.Account(account), out var actualReaderValue).Should().BeTrue(); + actualReaderValue.SequenceEqual(expectedReaderValue).Should().BeTrue(); + } + + static void AssertLastWrittenValue(IMultiHeadChain multi, Keccak stateHash, Keccak account, byte[] lastWrittenValue) + { + multi.TryLeaseReader(stateHash, out var lastReader).Should().BeTrue(); + lastReader.TryGet(Key.Account(account), out var actualLastWrittenValue).Should().BeTrue(); + actualLastWrittenValue.SequenceEqual(lastWrittenValue).Should().BeTrue(); + lastReader.Dispose(); + } + } } \ No newline at end of file diff --git a/src/Paprika/IDb.cs b/src/Paprika/IDb.cs index ecbb4856..69ca2a3f 100644 --- a/src/Paprika/IDb.cs +++ b/src/Paprika/IDb.cs @@ -31,7 +31,7 @@ public interface IDb /// Performs a snapshot of all the valid roots in the database. /// /// An array of roots. - IReadOnlyBatch[] SnapshotAll(); + IReadOnlyBatch[] SnapshotAll(bool withoutOldest = false); /// /// Whether there's a state with the given keccak. diff --git a/src/Paprika/Store/PagedDb.MultiHeadChain.cs b/src/Paprika/Store/PagedDb.MultiHeadChain.cs index 3f900c03..269fe287 100644 --- a/src/Paprika/Store/PagedDb.MultiHeadChain.cs +++ b/src/Paprika/Store/PagedDb.MultiHeadChain.cs @@ -39,6 +39,7 @@ private class MultiHeadChain : IMultiHeadChain // Readers private readonly Dictionary _readers = new(); + private readonly Queue _readersDisposalQueue = new(); private readonly ReaderWriterLockSlim _readerLock = new(); // Proposed batches that are finalized @@ -67,11 +68,19 @@ public MultiHeadChain(PagedDb db) _pool = _db._pool; - foreach (var batch in _db.SnapshotAll()) + // Snapshot all without the oldest, we want to keep only N-1 readers + var allWithoutTheOldest = _db.SnapshotAll(withoutOldest: true); + + // Sort from oldest to youngest + Array.Sort(allWithoutTheOldest, (a, b) => a.BatchId.CompareTo(b.BatchId)); + + foreach (var batch in allWithoutTheOldest) { _lastCommittedBatch = Math.Max(batch.BatchId, _lastCommittedBatch); var reader = new Reader(_db, CreateNextRoot([], (ReadOnlyBatch)batch), batch, [], _pool); RegisterReader(reader); + + _readersDisposalQueue.Enqueue(reader); } _flusher = FlusherTask(); @@ -263,7 +272,7 @@ private async Task FlusherTask() await _db._manager.WritePages(batch.Changes, options); // Set new root - var (previousRootStateHash, newRootPage) = _db.SetNewRoot(batch.Root); + var newRootPage = _db.SetNewRoot(batch.Root); // report _db.ReportDbSize(GetRootSizeInMb(batch.Root)); @@ -272,7 +281,7 @@ private async Task FlusherTask() List removed; - Reader newIReader; + Reader newReader; lock (_db._batchLock) { _db.CommitNewRoot(); @@ -290,10 +299,11 @@ private async Task FlusherTask() var read = BuildDependencies(batch.Root.Data.Metadata.StateHash, out var root, out _, out var proposed); - newIReader = new Reader(_db, root, read, proposed, _pool); + newReader = new Reader(_db, root, read, proposed, _pool); } - RegisterNewReaderAfterFinalization(removed, newIReader, previousRootStateHash); + // Register the new reader and await the disposal of the oldest one. + await RegisterNewReaderAfterFinalization(removed, newReader); // Only now dispose the removed as they had their data used above. foreach (var b in removed) @@ -315,33 +325,34 @@ private async Task FlusherTask() } } - private void RegisterNewReaderAfterFinalization(List removed, Reader newReader, - Keccak previousRootStateHash) + private Task RegisterNewReaderAfterFinalization(List removed, Reader newReader) { var toDispose = new List(); + Reader? oldest = null; // Update readers _readerLock.EnterWriteLock(); try { - Reader? headReader; - // Remove the previous foreach (var b in removed) { - if (_readers.Remove(b.StateHash, out headReader)) + if (_readers.Remove(b.StateHash, out var headReader)) { toDispose.Add(headReader); } } - // Register the new one + // Register the new one in the dictionary and in the disposal queue _readers[newReader.Metadata.StateHash] = newReader; - // Remove the previous state root if exists - if (_readers.Remove(previousRootStateHash, out headReader)) + _readersDisposalQueue.Enqueue(newReader); + + if (_readersDisposalQueue.Count == _db._historyDepth) { - toDispose.Add(headReader); + // Ensure that we keep only N-1 readers for the history, so that the next spin can copy over to the root. + oldest = _readersDisposalQueue.Dequeue(); + _readers.Remove(oldest.Metadata.StateHash); } } finally @@ -354,6 +365,12 @@ private void RegisterNewReaderAfterFinalization(List removed, Rea { reader.Dispose(); } + + if (oldest == null) + return Task.CompletedTask; + + oldest.Dispose(); + return oldest.CleanedUp; } private ProposedBatch FindProposed(Keccak keccak) diff --git a/src/Paprika/Store/PagedDb.cs b/src/Paprika/Store/PagedDb.cs index e765111d..87293bfe 100644 --- a/src/Paprika/Store/PagedDb.cs +++ b/src/Paprika/Store/PagedDb.cs @@ -282,13 +282,15 @@ public IReadOnlyBatch BeginReadOnlyBatchOrLatest(in Keccak stateHash, string nam } } - public IReadOnlyBatch[] SnapshotAll() + public IReadOnlyBatch[] SnapshotAll(bool withoutOldest = false) { var batches = new List(); + var limit = withoutOldest ? _historyDepth - 1 : _historyDepth; + lock (_batchLock) { - for (var back = 0; back < _historyDepth; back++) + for (var back = 0; back < limit; back++) { if (_lastRoot - back < 0) { @@ -395,7 +397,6 @@ private void DisposeReadOnlyBatch(ReadOnlyBatch batch) { _lowestReadTxBatch.Set((int)_batchesReadOnly.Min(b => b.BatchId)); } - } } @@ -425,7 +426,8 @@ private IBatch BuildFromRoot(RootPage current) [DoesNotReturn] [StackTraceHidden] - private static void ThrowOnlyOneBatch() => throw new Exception("There is another batch active at the moment. Commit the other first"); + private static void ThrowOnlyOneBatch() => + throw new Exception("There is another batch active at the moment. Commit the other first"); [DoesNotReturn] [StackTraceHidden] @@ -504,20 +506,17 @@ private static RootPage CreateNextRoot(RootPage current, BufferPool pool) /// /// Sets the new root but does not bump the _lastRoot that should be done in a lock. /// - private (Keccak previousRootStateHash, DbAddress newRootPage) SetNewRoot(RootPage root) + private DbAddress SetNewRoot(RootPage root) { var pageAddress = (_lastRoot + 1) % _historyDepth; var destination = _roots[pageAddress]; - - var previousStateHash = destination.Data.Metadata.StateHash; - root.CopyTo(destination); - return (previousStateHash, DbAddress.Page((uint)pageAddress)); + + return DbAddress.Page((uint)pageAddress); } private void CommitNewRoot() => _lastRoot += 1; - private sealed class ReadOnlyBatch(PagedDb db, RootPage root, string name) : IVisitableReadOnlyBatch, IReadOnlyBatchContext { @@ -649,7 +648,7 @@ public async ValueTask Commit(CommitOptions options) await Db._manager.WritePages(Written, options); - var (_, newRootPage) = Db.SetNewRoot(Root); + var newRootPage = Db.SetNewRoot(Root); // report Db.ReportDbSize(GetRootSizeInMb(Root)); @@ -799,7 +798,8 @@ public override Page GetNewPage(out DbAddress addr, bool clear) // on failure to reuse a page, default to allocating a new one. addr = Root.Data.GetNextFreePage(); - Debug.Assert(Db._manager.IsValidAddress(addr), $"The address of the next free page {addr} breaches the size of the database."); + Debug.Assert(Db._manager.IsValidAddress(addr), + $"The address of the next free page {addr} breaches the size of the database."); } var page = GetAtForWriting(addr); diff --git a/src/Paprika/Utils/RefCountingDisposable.cs b/src/Paprika/Utils/RefCountingDisposable.cs index e43ff99d..fa4a91d2 100644 --- a/src/Paprika/Utils/RefCountingDisposable.cs +++ b/src/Paprika/Utils/RefCountingDisposable.cs @@ -13,6 +13,14 @@ public abstract class RefCountingDisposable : IRefCountingDisposable private const int NoAccessors = 0; private const int Disposing = -1; + // Always run continuations asynchronously. + private readonly TaskCompletionSource _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + + /// + /// A task that is completed once this is cleaned up using . + /// + public Task CleanedUp => _tcs.Task; + private PaddedValue _leases; protected RefCountingDisposable(int initialCount = Single) @@ -111,6 +119,7 @@ private void ReleaseLeaseOnce() { // set to disposed by this Release CleanUp(); + _tcs.SetResult(); } [DoesNotReturn] @@ -140,4 +149,9 @@ private struct PaddedValue public interface IRefCountingDisposable : IDisposable { void AcquireLease(); + + /// + /// A task that is completed once this is cleaned up. + /// + public Task CleanedUp { get; } } \ No newline at end of file