Skip to content

Commit

Permalink
Different Merkle prefetching (#431)
Browse files Browse the repository at this point in the history
* Prefetcher restructured

* single prefetcher

* atomics

* format

* prefetch storage test restructured
  • Loading branch information
Scooletz authored Nov 5, 2024
1 parent e6c04bf commit 8f11944
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 181 deletions.
139 changes: 81 additions & 58 deletions src/Paprika.Tests/Chain/PrefetchingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

namespace Paprika.Tests.Chain;


public class PrefetchingTests
{
[Test]
Expand Down Expand Up @@ -85,14 +84,11 @@ private static void Set(Keccak[] accounts, uint account, IWorldState start, UInt
[TestCase(false, true, Category = Categories.LongRunning, TestName = "Storage, no prefetch")]
public async Task Spin(bool prefetch, bool storage)
{
var commits = new Stopwatch();

const int parallelism = ComputeMerkleBehavior.ParallelismNone;
const int finalityLength = 16;
const int accounts = 200_000;
const int accounts = 50_000;
const int accountsPerBlock = 100;
const int visitSameAccountCount = 2;
const int blocks = accounts / accountsPerBlock * visitSameAccountCount;
const int blocks = accounts / accountsPerBlock;

var random = new Random(13);
var keccaks = new Keccak[accounts];
Expand All @@ -102,82 +98,109 @@ public async Task Spin(bool prefetch, bool storage)
using var db = PagedDb.NativeMemoryDb(1024 * 1024 * 1024, 2);
var merkle = new ComputeMerkleBehavior(parallelism);
await using var blockchain = new Blockchain(db, merkle);
var at = 0;

const uint startBlockNumber = 1;
var parent = Keccak.EmptyTreeHash;
var finality = new Queue<Keccak>();
var prefetchFailures = 0;

for (uint i = 1; i < blocks; i++)
// Setup test by creating all the account first.
// This should ensure that he Merkle construct is created and future updates should be prefetched properly without additional db reads
using var first = blockchain.StartNew(parent);
SetAccounts(new ReadOnlyMemory<Keccak>(keccaks), first, startBlockNumber, storage);
parent = first.Commit(startBlockNumber);
blockchain.Finalize(parent);
await blockchain.WaitTillFlush(startBlockNumber);

// Run commits now with a prefetching
await RunBlocksWithPrefetching(blockchain, keccaks, parent, prefetch, storage);
return;

static async Task RunBlocksWithPrefetching(Blockchain blockchain, Keccak[] keccaks, Keccak parent,
bool prefetch, bool storage)
{
using var block = blockchain.StartNew(parent);
var finality = new Queue<Keccak>();
var prefetchFailures = 0;
var at = 0;

var slice = keccaks.AsMemory(at % accounts, accountsPerBlock);
at += accountsPerBlock;
var commits = new Stopwatch();

// Execution delay
var task = !prefetch
? Task.FromResult(true)
: Task.Factory.StartNew(() =>
{
var prefetcher = block.OpenPrefetcher();
if (prefetcher == null)
return true;
for (var i = startBlockNumber + 1; i < blocks + startBlockNumber + 1; i++)
{
using var block = blockchain.StartNew(parent);

foreach (var keccak in slice.Span)
var slice = keccaks.AsMemory(at % accounts, accountsPerBlock);
at += accountsPerBlock;

// Execution delay
var task = !prefetch
? Task.FromResult(true)
: Task.Factory.StartNew(() =>
{
if (prefetcher.CanPrefetchFurther == false)
{
return false;
}
var prefetcher = block.OpenPrefetcher();
if (prefetcher == null)
return true;

prefetcher.PrefetchAccount(keccak);
if (storage)
foreach (var keccak in slice.Span)
{
prefetcher.PrefetchStorage(keccak, keccak);
if (prefetcher.CanPrefetchFurther == false)
{
return false;
}

prefetcher.PrefetchAccount(keccak);
if (storage)
{
prefetcher.PrefetchStorage(keccak, keccak);
}
}
}

return true;
});
return true;
});

await Task.WhenAll(Task.Delay(50), task);
await Task.WhenAll(Task.Delay(50), task);

if ((await task) == false)
prefetchFailures++;
if ((await task) == false)
prefetchFailures++;

SetAccounts(slice, block, i, storage);
SetAccounts(slice, block, i, storage);

commits.Start();
parent = block.Commit(i);
commits.Stop();
commits.Start();
parent = block.Commit(i);
commits.Stop();

finality.Enqueue(parent);
finality.Enqueue(parent);

if (finality.Count > finalityLength)
if (finality.Count > finalityLength)
{
blockchain.Finalize(finality.Dequeue());
}
}

while (finality.TryDequeue(out var k))
{
blockchain.Finalize(finality.Dequeue());
blockchain.Finalize(k);
}
}

while (finality.TryDequeue(out var k))
{
blockchain.Finalize(k);
Console.WriteLine($"Prefetch failures: {prefetchFailures}. Commit time {commits.Elapsed:g}");
}

Console.WriteLine($"Prefetch failures: {prefetchFailures}. Commit time {commits.Elapsed:g}");
}

private static void SetAccounts(ReadOnlyMemory<Keccak> slice, IWorldState block, uint i, bool storage)
{
Span<byte> value = stackalloc byte[sizeof(uint)];
BinaryPrimitives.WriteUInt32LittleEndian(value, i);

foreach (var keccak in slice.Span)
static void SetAccounts(ReadOnlyMemory<Keccak> slice, IWorldState block, uint i, bool storage)
{
block.SetAccount(keccak, new Account(i, i));
if (storage)
Span<byte> value = stackalloc byte[sizeof(uint)];
BinaryPrimitives.WriteUInt32LittleEndian(value, i);

foreach (var keccak in slice.Span)
{
block.SetStorage(keccak, keccak, value);
block.SetAccount(keccak, new Account(i, i));
if (storage)
{
block.SetStorage(keccak, keccak, value);

if (i == startBlockNumber)
{
// Additional slot for the start, so that a branch is created and there's something to prefetch.
block.SetStorage(keccak, Keccak.EmptyTreeHash, value);
}
}
}
}
}
Expand Down
Loading

0 comments on commit 8f11944

Please sign in to comment.