Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Head tracking batch #417

Draft
wants to merge 69 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
b0bbeca
sketch of the page table
Scooletz Oct 23, 2024
a69330a
renaming and simplifying HeadBatch
Scooletz Nov 12, 2024
0817c05
renaming
Scooletz Nov 12, 2024
922b5a6
revere mapping
Scooletz Nov 12, 2024
d1c7675
more extraction
Scooletz Nov 12, 2024
b3c1469
Merge branch 'main' into page-table
Scooletz Nov 13, 2024
2bc911f
moving more members, making base context less dependent
Scooletz Nov 13, 2024
6409faf
stats and build
Scooletz Nov 13, 2024
c218587
head tracking batch
Scooletz Nov 13, 2024
96e7c33
separation of apis
Scooletz Nov 13, 2024
30edee7
towards implementation of the tracking
Scooletz Nov 13, 2024
4dea197
refactored root finding
Scooletz Nov 14, 2024
b199b61
towards testable MultiHeadChain
Scooletz Nov 14, 2024
8e37291
reading from multihead and proper disposal
Scooletz Nov 14, 2024
3fb6d8a
green test with reads
Scooletz Nov 14, 2024
164a8ad
GetAtWriting to ensure that MultiHead can track pages properly with a…
Scooletz Nov 14, 2024
2797d33
clearing
Scooletz Nov 14, 2024
d1d17f6
multi block
Scooletz Nov 14, 2024
6862627
moving metadata to Commit of the head
Scooletz Nov 15, 2024
4bc3872
ref counting proposed batches
Scooletz Nov 15, 2024
fee1ed1
fix doubled root creation
Scooletz Nov 15, 2024
86f6675
new api to support not mmaped writes
Scooletz Nov 15, 2024
989e726
flusher in MultiHead
Scooletz Nov 15, 2024
83f5882
into the channel
Scooletz Nov 15, 2024
33e088a
async disposable and finalization mechanism
Scooletz Nov 18, 2024
fa2f90a
finality and block lock
Scooletz Nov 18, 2024
cc6cbc8
reporting
Scooletz Nov 18, 2024
ec253ba
tests fixups
Scooletz Nov 19, 2024
62c6035
proper finalization handling for long chains of blocks
Scooletz Nov 19, 2024
e92e611
ref counting interface
Scooletz Nov 19, 2024
2dc59f8
HeadReader
Scooletz Nov 19, 2024
738ede5
reader offloading state
Scooletz Nov 19, 2024
e5f4879
readonly batch
Scooletz Nov 19, 2024
66f1ed4
no trace in pool
Scooletz Nov 19, 2024
a67ff8e
reading added
Scooletz Nov 20, 2024
d31da73
reader management
Scooletz Nov 20, 2024
b6abdb9
almost migrated
Scooletz Nov 20, 2024
b9103bf
more
Scooletz Nov 20, 2024
c3dd619
towards working tests
Scooletz Nov 21, 2024
e941655
moving blockchain to use multihead
Scooletz Nov 21, 2024
84d580b
stats updated
Scooletz Nov 21, 2024
f7293a0
towards working WorldState
Scooletz Nov 21, 2024
988edf6
abandoned registration and more
Scooletz Nov 22, 2024
7711369
two more tests
Scooletz Nov 22, 2024
9bccfb3
concurrent dictionary for mappings
Scooletz Nov 22, 2024
1afa56b
proper min batch id calculation as well as the reader creation
Scooletz Nov 25, 2024
41869b7
reversed map removal
Scooletz Nov 26, 2024
671f167
proper overwrites criteria
Scooletz Nov 26, 2024
f074f97
no hash normalization by setting the first root and proper disposal
Scooletz Nov 27, 2024
316e331
test fixes
Scooletz Nov 27, 2024
8ac678b
no copy needed
Scooletz Nov 27, 2024
c1033ec
one more with zero Keccak
Scooletz Nov 27, 2024
11a5836
test fixups
Scooletz Nov 27, 2024
d89e30a
Merge branch 'main' into page-table
Scooletz Nov 27, 2024
aeb0f13
Reader desposal made awaitable and properly handled in the FlusherTask
Scooletz Nov 28, 2024
b05b8a6
restored BitMapFilter
Scooletz Nov 28, 2024
f73b43e
Added last finalized api
Scooletz Nov 29, 2024
f099f27
test added for the LeaseLatest
Scooletz Nov 29, 2024
717ccd6
flushing events
Scooletz Nov 29, 2024
365fb3f
throw on not found
Scooletz Nov 29, 2024
494e783
MultiHead allows to create non-committable world state
Scooletz Nov 29, 2024
00397e5
pageTable cache in front of a dictionary
Scooletz Dec 3, 2024
c9916cb
atomic clean up
Scooletz Dec 3, 2024
9e48831
update cache only on mapping changes
Scooletz Dec 4, 2024
cb1990d
override removal made a bit faster
Scooletz Dec 4, 2024
a790db3
close prefetcher after commit
Scooletz Dec 4, 2024
bef584a
Merge branch 'main' into page-table
Scooletz Dec 6, 2024
a686b6c
prefetching disposal got right plus test fixes
Scooletz Dec 9, 2024
06f783e
Update src/Paprika/Store/PagedDb.cs
Scooletz Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Paprika.Tests/Store/BasePageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal class TestBatchContext(uint batchId, Stack<DbAddress>? reusable = null)

public override Page GetAt(DbAddress address) => _address2Page[address];

public override void Prefetch(DbAddress address)
public override void Prefetch(DbAddress addr)
{ }

public override DbAddress GetAddress(Page page) => _page2Address[page.Raw];
Expand Down
2 changes: 1 addition & 1 deletion src/Paprika/Store/BatchContextBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ abstract class BatchContextBase(uint batchId) : IBatchContext
public uint BatchId { get; } = batchId;

public abstract Page GetAt(DbAddress address);
public abstract void Prefetch(DbAddress address);
public abstract void Prefetch(DbAddress addr);

public abstract DbAddress GetAddress(Page page);

Expand Down
2 changes: 1 addition & 1 deletion src/Paprika/Store/IBatchContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public interface IPageResolver
/// </summary>
Page GetAt(DbAddress address);

void Prefetch(DbAddress address);
void Prefetch(DbAddress addr);

void Prefetch(ReadOnlySpan<DbAddress> addresses)
{
Expand Down
5 changes: 0 additions & 5 deletions src/Paprika/Store/IPageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@ public interface IPageManager : IDisposable, IPageResolver
{
DbAddress GetAddress(in Page page);

/// <summary>
/// Gets the page for writing purposes, ensuring that the page that is requested is finalized on disk.
/// </summary>
Page GetAtForWriting(DbAddress address, bool reused);

/// <summary>
/// Writes pages specified by <paramref name="addresses"/> to the underlying storage.
/// </summary>
Expand Down
6 changes: 3 additions & 3 deletions src/Paprika/Store/PageManagers/PointerPageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ public abstract unsafe class PointerPageManager(long size) : IPageManager

protected abstract void* Ptr { get; }

public void Prefetch(DbAddress address)
public void Prefetch(DbAddress addr)
{
if (Sse.IsSupported)
{
if (address.IsNull || address.Raw > (uint)MaxPage)
if (addr.IsNull || addr.Raw > (uint)MaxPage)
{
return;
}

// Fetch to L2 cache as we don't know if will need it
// So don't pollute L1 cache
Sse.Prefetch1((byte*)Ptr + address.FileOffset);
Sse.Prefetch1((byte*)Ptr + addr.FileOffset);
}
}

Expand Down
136 changes: 102 additions & 34 deletions src/Paprika/Store/PagedDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static PagedDb MemoryMappedDb(long size, byte historyDepth, string direct
new MemoryMappedPageManager(size, historyDepth, directory,
flushToDisk ? PersistenceOptions.FlushFile : PersistenceOptions.MMapOnly), historyDepth);

public void Prefetch(DbAddress address) => _manager.Prefetch(address);
public void Prefetch(DbAddress addr) => _manager.Prefetch(addr);

private void ReportReads(long number) => _reads.Add(number);

Expand Down Expand Up @@ -427,8 +427,6 @@ static void ThrowOnlyOneBatch()

public Page GetAt(DbAddress address) => _manager.GetAt(address);

private Page GetAtForWriting(DbAddress address, bool reused) => _manager.GetAtForWriting(address, reused);

/// <summary>
/// Sets the new root but does not bump the _lastRoot that should be done in a lock.
/// </summary>
Expand Down Expand Up @@ -506,16 +504,88 @@ public IDictionary<Keccak, uint> IdCache
}
}

public void Prefetch(DbAddress address) => db.Prefetch(address);
public void Prefetch(DbAddress addr) => db.Prefetch(addr);

public Page GetAt(DbAddress address) => db._manager.GetAt(address);

public override string ToString() => $"{nameof(ReadOnlyBatch)}, Name: {name}, BatchId: {BatchId}";
}

/// <summary>
/// A linked batch allows for an arbitrary depth nesting of transactions and committing them bottom up.
/// </summary>
/// <remarks>
/// As the mapping are stored in the page table, the linked batch allows to query the data with
/// one dictionary lookup and a fallback to the database.
///
/// Additionally, it can be built with other linked batches as dependencies and copy from their page tables.
/// The crux here is that only the previous one requires visiting as it has the copies of the page mapping
/// from the previous one. As the pages are overwritten when needed, the dictionary should not be too big.
/// </remarks>
private class LinkedBatch : Batch
{
private static readonly UIntPtr WriteMarker = new(1UL << (UIntPtr.Size == 4 ? 31 : 63));
private static readonly UIntPtr AddressMask = ~WriteMarker;

private readonly Dictionary<DbAddress, UIntPtr> _pageTable = new();
Scooletz marked this conversation as resolved.
Show resolved Hide resolved

public LinkedBatch(PagedDb db, RootPage root, uint reusePagesOlderThanBatchId, Context ctx)
: base(db, root, reusePagesOlderThanBatchId, ctx)
{
}

public override void Prefetch(DbAddress addr) { }

protected override unsafe Page GetAtImpl(DbAddress addr, bool write)
{
ref var slot = ref CollectionsMarshal.GetValueRefOrNullRef(_pageTable, addr);

if (Unsafe.IsNullRef(ref slot) == false)
{
// The value exists

var writtenThisBatch = (slot & WriteMarker) == WriteMarker;

var ptr = (byte*)(slot & AddressMask).ToPointer();
var source = new Page(ptr);

if (!write || writtenThisBatch)
{
return source;
}

// Not written this batch, allocate and copy.
return MakeCopy(source, out slot);
}

// Does not exist, fetch from db
var fromDb = Db.GetAt(addr);

// Make copy on write, while return raw from db if a read.
if (!write)
{
return fromDb;
}

// The entry did not exist before, create one
var copy = MakeCopy(fromDb, out slot);
_pageTable[addr] = slot;
return copy;
}

private static Page MakeCopy(Page source, out UIntPtr slot)
{
throw new Exception("Use pool, get a page and map to it");
Scooletz marked this conversation as resolved.
Show resolved Hide resolved
Page copy = default;
source.CopyTo(copy);
slot = copy.Raw | WriteMarker;
return copy;
}
}

class Batch : BatchContextBase, IBatch
{
private readonly PagedDb _db;
protected readonly PagedDb Db;
private readonly RootPage _root;
private readonly uint _reusePagesOlderThanBatchId;
private bool _verify = false;
Expand Down Expand Up @@ -543,7 +613,7 @@ class Batch : BatchContextBase, IBatch
public Batch(PagedDb db, RootPage root, uint reusePagesOlderThanBatchId, Context ctx) : base(
root.Header.BatchId)
{
_db = db;
Db = db;
_root = root;
_reusePagesOlderThanBatchId = reusePagesOlderThanBatchId;
_ctx = ctx;
Expand Down Expand Up @@ -577,7 +647,7 @@ static void ThrowDisposed()
}
}

public void VerifyNoPagesMissing() => MissingPagesVisitor.VerifyNoPagesMissing(_root, _db, this);
public void VerifyNoPagesMissing() => MissingPagesVisitor.VerifyNoPagesMissing(_root, Db, this);

public void SetMetadata(uint blockNumber, in Keccak blockHash)
{
Expand Down Expand Up @@ -632,33 +702,33 @@ public async ValueTask Commit(CommitOptions options)
}

// report metrics
_db.ReportPageStatsPerCommit(_written.Count, _metrics.PagesReused, _metrics.PagesAllocated,
Db.ReportPageStatsPerCommit(_written.Count, _metrics.PagesReused, _metrics.PagesAllocated,
_abandoned.Count, _metrics.RegisteredToReuseAfterWritingThisBatch);

_db.ReportReads(_metrics.Reads);
_db.ReportWrites(_metrics.Writes);
Db.ReportReads(_metrics.Reads);
Db.ReportWrites(_metrics.Writes);

#if TRACKING_REUSED_PAGES
_db._reusablePages.Set(_db._registeredForReuse.Count);
#endif

await _db._manager.WritePages(_written, options);
await Db._manager.WritePages(_written, options);

var newRootPage = _db.SetNewRoot(_root);
var newRootPage = Db.SetNewRoot(_root);

// report
_db.ReportDbSize(GetRootSizeInMb(_root));
Db.ReportDbSize(GetRootSizeInMb(_root));

await _db._manager.WriteRootPage(newRootPage, options);
await Db._manager.WriteRootPage(newRootPage, options);

lock (_db._batchLock)
lock (Db._batchLock)
{
_db.CommitNewRoot();
Debug.Assert(ReferenceEquals(this, _db._batchCurrent));
_db._batchCurrent = null;
Db.CommitNewRoot();
Debug.Assert(ReferenceEquals(this, Db._batchCurrent));
Db._batchCurrent = null;
}

_db.ReportCommit(watch.Elapsed);
Db.ReportCommit(watch.Elapsed);
}

public void VerifyDbPagesOnCommit()
Expand All @@ -667,42 +737,40 @@ public void VerifyDbPagesOnCommit()
}

[DebuggerStepThrough]
public override Page GetAt(DbAddress address)
public sealed override Page GetAt(DbAddress address)
{
// Getting a page beyond root!
var nextFree = _root.Data.NextFreePage;
Debug.Assert(address < nextFree, $"Breached the next free page, NextFree: {nextFree}, retrieved {address}");
var page = _db.GetAt(address);
return page;

return GetAtImpl(address, false);
}

public override void Prefetch(DbAddress address) => _db.Prefetch(address);
protected virtual Page GetAtImpl(DbAddress addr, bool write) => Db.GetAt(addr);

public override DbAddress GetAddress(Page page) => _db.GetAddress(page);
public override void Prefetch(DbAddress addr) => Db.Prefetch(addr);

public override DbAddress GetAddress(Page page) => Db.GetAddress(page);

public override Page GetNewPage(out DbAddress addr, bool clear)
{
bool reused;
if (_reusedImmediately.TryPop(out addr))
{
reused = true;
_metrics.PagesReused++;
}
else if (TryGetNoLongerUsedPage(out addr))
{
reused = true;
_metrics.PagesReused++;
}
else
{
reused = false;
_metrics.PagesAllocated++;

// on failure to reuse a page, default to allocating a new one.
addr = _root.Data.GetNextFreePage();
}

var page = _db.GetAtForWriting(addr, reused);
var page = GetAtImpl(addr, true);

// if (reused)
// {
Expand Down Expand Up @@ -780,7 +848,7 @@ private void RegisterForFutureReuse(DbAddress addr)

public override void RegisterForFutureReuse(Page page, bool possibleImmediateReuse = false)
{
var addr = _db.GetAddress(page);
var addr = Db.GetAddress(page);

if (page.Header.BatchId == BatchId)
{
Expand Down Expand Up @@ -813,16 +881,16 @@ public void Dispose()
{
_disposed = true;

lock (_db._batchLock)
lock (Db._batchLock)
{
if (ReferenceEquals(_db._batchCurrent, this))
if (ReferenceEquals(Db._batchCurrent, this))
{
_db._batchCurrent = null;
Db._batchCurrent = null;
}

// clear and return
_ctx.Clear();
_db._ctx = _ctx;
Db._ctx = _ctx;
}
}
}
Expand Down
Loading