Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Platform specific prefetcher #396

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
7 changes: 4 additions & 3 deletions src/Paprika.Tests/Store/BasePageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ internal class TestBatchContext(uint batchId, Stack<DbAddress>? reusable = null)

public override Page GetAt(DbAddress address) => _address2Page[address];

public override void Prefetch(DbAddress address)
{ }
public override void Prefetch(ReadOnlySpan<DbAddress> addresses)
{
}

public override DbAddress GetAddress(Page page) => _page2Address[page.Raw];

Expand Down Expand Up @@ -106,4 +107,4 @@ public TestBatchContext Next()
}

internal static TestBatchContext NewBatch(uint batchId) => new(batchId);
}
}
96 changes: 96 additions & 0 deletions src/Paprika/Platform.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
using System.ComponentModel;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

namespace Paprika;

public static class Platform
{
public static void Prefetch(ReadOnlySpan<UIntPtr> addresses, UIntPtr size) => Manager.SchedulePrefetch(addresses, size);

private static readonly IMemoryManager Manager =
IsPosix() ? new PosixMemoryManager() : new WindowsMemoryManager();

private static bool IsPosix() => RuntimeInformation.IsOSPlatform(OSPlatform.Linux) ||
RuntimeInformation.IsOSPlatform(OSPlatform.OSX);

private sealed class PosixMemoryManager : IMemoryManager
{
[Flags]
private enum Advice : int
{
// ReSharper disable InconsistentNaming
/// <summary>
/// Expect access in the near future. (Hence, it might be a good idea to read some pages ahead.)
/// </summary>
MADV_WILLNEED = 0x3,

/// <summary>
/// Do not expect access in the near future.
/// (For the time being, the application is finished with the given range,
/// so the kernel can free resources associated with it.)
/// </summary>
MADV_DONTNEED = 0x4,
// ReSharper restore InconsistentNaming
}

[DllImport("LIBC_6", SetLastError = true)]
static extern int madvise(IntPtr addr, UIntPtr length, Advice advice);

public void SchedulePrefetch(ReadOnlySpan<UIntPtr> addresses, UIntPtr length)
{
// TODO:
}
}

private sealed class WindowsMemoryManager : IMemoryManager
{
[DllImport("kernel32.dll", SetLastError = true)]
private static extern unsafe bool PrefetchVirtualMemory(IntPtr hProcess, ulong numberOfEntries,
Win32MemoryRangeEntry* entries, int flags);

[StructLayout(LayoutKind.Sequential)]
private struct Win32MemoryRangeEntry
{
/// <summary>
/// Starting address of the memory range
/// </summary>
public IntPtr VirtualAddress;

/// <summary>
/// Size of the memory range in bytes
/// </summary>
public UIntPtr NumberOfBytes;
}

[SkipLocalsInit]
public unsafe void SchedulePrefetch(ReadOnlySpan<UIntPtr> addresses, UIntPtr length)
{
var count = addresses.Length;
var ptr = stackalloc Win32MemoryRangeEntry[count];
var span = new Span<Win32MemoryRangeEntry>(ptr, count);

for (var i = 0; i < span.Length; i++)
{
span[i].VirtualAddress = (IntPtr)addresses[i];
span[i].NumberOfBytes = length;
}

const int reserved = 0;

if (PrefetchVirtualMemory(Process.GetCurrentProcess().Handle, (ulong)count, ptr, reserved) == false)
{
throw new Win32Exception(Marshal.GetLastWin32Error());
}
}
}

private interface IMemoryManager
{
/// <summary>
/// Schedules an OS dependent prefetch.
/// </summary>
void SchedulePrefetch(ReadOnlySpan<UIntPtr> addresses, UIntPtr length);
}
}
7 changes: 7 additions & 0 deletions src/Paprika/Store/AbandonedList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ public bool TryGet(out DbAddress reused, uint minBatchId, IBatchContext batch)

if (current.TryPop(out reused))
{
// Schedule prefetching next if possible
if (current.TryPeek(out var next, out _))
{
Debug.Assert(next.IsNull == false, "Next should not be NULL here");
batch.Prefetch(next);
}

return true;
}

Expand Down
2 changes: 1 addition & 1 deletion src/Paprika/Store/BatchContextBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ abstract class BatchContextBase(uint batchId) : IBatchContext
public uint BatchId { get; } = batchId;

public abstract Page GetAt(DbAddress address);
public abstract void Prefetch(DbAddress address);
public abstract void Prefetch(ReadOnlySpan<DbAddress> addresses);

public abstract DbAddress GetAddress(Page page);

Expand Down
27 changes: 11 additions & 16 deletions src/Paprika/Store/IBatchContext.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using Paprika.Crypto;

namespace Paprika.Store;
Expand Down Expand Up @@ -126,29 +128,22 @@ public interface IPageResolver
/// </summary>
Page GetAt(DbAddress address);

void Prefetch(DbAddress address);
void Prefetch(DbAddress address) => Prefetch(MemoryMarshal.CreateReadOnlySpan(ref address, 1));

void Prefetch(ReadOnlySpan<DbAddress> addresses)
{
foreach (var bucket in addresses)
{
if (!bucket.IsNull)
{
Prefetch(bucket);
}
}
}
void Prefetch(ReadOnlySpan<DbAddress> addresses);

[SkipLocalsInit]
void Prefetch<TAddressList>(in TAddressList addresses)
where TAddressList : struct, DbAddressList.IDbAddressList
{
Span<DbAddress> span = stackalloc DbAddress[TAddressList.Length];

// Copy all
for (var i = 0; i < TAddressList.Length; i++)
{
var addr = addresses[i];
if (!addr.IsNull)
{
Prefetch(addr);
}
span[i] = addresses[i];
}

Prefetch(span);
}
}
65 changes: 65 additions & 0 deletions src/Paprika/Store/PageManagers/MemoryMappedPageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.IO.MemoryMappedFiles;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading.Channels;
using System.Xml.Serialization;
using Paprika.Utils;

namespace Paprika.Store.PageManagers;
Expand All @@ -24,6 +27,18 @@ public sealed class MemoryMappedPageManager : PointerPageManager
private readonly MemoryMappedViewAccessor _whole;
private readonly unsafe byte* _ptr;

// Prefetcher
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New to Github PR reviews, can't find a way to comment on the existing line of code.
But anyway, the comment above describing that "prefetching is futile" needs to updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it could be made more specific as this is about the Sequential flag as we don't do Sequential because clearly it's a random access all over the place. Will amend.

private const int PrefetcherCapacity = 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it better to make this a configurable parameter instead of a hardcoded value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could that configuration parameter be used to turn-off prefetching altogether in case it is not required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, will add such capability.

private readonly Channel<DbAddress> _prefetches = Channel.CreateBounded<DbAddress>(new BoundedChannelOptions(PrefetcherCapacity)
{
FullMode = BoundedChannelFullMode.DropOldest,
SingleReader = true,
SingleWriter = false,
Capacity = PrefetcherCapacity,
AllowSynchronousContinuations = false
});
private readonly Task _prefetcher;

// Flusher section
private readonly Stack<PageMemoryOwner> _owners = new();
private readonly List<PageMemoryOwner> _ownersUsed = new();
Expand Down Expand Up @@ -72,6 +87,8 @@ public unsafe MemoryMappedPageManager(long size, byte historyDepth, string dir,
_meter = new Meter("Paprika.Store.PageManager");
_fileWrites = _meter.CreateHistogram<int>("File writes", "Syscall", "Actual numbers of file writes issued");
_writeTime = _meter.CreateHistogram<int>("Write time", "ms", "Time spent in writing");

_prefetcher = Task.Factory.StartNew(RunPrefetcher);
}

public static string GetPaprikaFilePath(string dir) => System.IO.Path.Combine(dir, PaprikaFileName);
Expand All @@ -80,6 +97,51 @@ public unsafe MemoryMappedPageManager(long size, byte historyDepth, string dir,

protected override unsafe void* Ptr => _ptr;

public override void Prefetch(ReadOnlySpan<DbAddress> addresses)
{
var writer = _prefetches.Writer;

foreach (var address in addresses)
{
if (address.IsNull == false)
{
writer.TryWrite(address);
}
}
}

private async Task RunPrefetcher()
{
var reader = _prefetches.Reader;

while (await reader.WaitToReadAsync())
{
PrefetchImpl(reader, this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A basic question: Since the prefetcher runner thread is running asynchronously, is it theoretically possible that for a certain address it executes the prefetch after the actual read on that address is already done? Prefetcher will be able to find that the address is already in memory, but still these calls can effectively double the number of IOs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that it will run after the page is fetched. And yes, it's possible that it will be fetching over pages that are already there. We could then discuss whether to leave SSE-based soft prefetch and leave this implementation for heavier use.

The place that I'd still see it as is, would be the AbandonedList that prepares the next page that will be copied to. In this case, the page is returned, then copied to etc while the next is scheduled for a prefetch. This should give it enough time.

The next place that it works (just following the raw execution tiem) is visiting. We schedule prefetching for all the children up front and as it walks in the preorder, beside the first child the rest most likely will have time to be prefetched. Again, raw execution time shows a very nice boost (also, disk throughput is much higher)

The one place that makes it less easy to reason about is the DataPage.TryGet. Maybe, in this case we should use SSE prefetch. The page, if it faults, it will take more time anyway than a search across page (now ~10ns). So it might be the case that we'd like to skip issuing a costly OS call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying! Make sense to me to use two different variants based on the usage 😄

}

[SkipLocalsInit]
static void PrefetchImpl(ChannelReader<DbAddress> reader, MemoryMappedPageManager manager)
{
const int maxPrefetch = 128;

Span<UIntPtr> span = stackalloc UIntPtr[maxPrefetch];
var i = 0;

for (; i < maxPrefetch; i++)
{
if (reader.TryRead(out var address) == false)
break;

span[i] = manager.GetAt(address).Raw;
}

if (i > 0)
{
Platform.Prefetch(span[..i], Page.PageSize);
}
}
}

public override async ValueTask FlushPages(ICollection<DbAddress> dbAddresses, CommitOptions options)
{
if (_options == PersistenceOptions.MMapOnly)
Expand Down Expand Up @@ -182,6 +244,9 @@ public override void ForceFlush()

public override void Dispose()
{
_prefetches.Writer.Complete();
_prefetcher.Wait();

_meter.Dispose();

_whole.SafeMemoryMappedViewHandle.ReleasePointer();
Expand Down
17 changes: 10 additions & 7 deletions src/Paprika/Store/PageManagers/PointerPageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@ public abstract unsafe class PointerPageManager(long size) : IPageManager

protected abstract void* Ptr { get; }

public void Prefetch(DbAddress address)
public virtual void Prefetch(ReadOnlySpan<DbAddress> addresses)
{
if (Sse.IsSupported)
{
if (address.IsNull || address.Raw > (uint)MaxPage)
foreach (var address in addresses)
{
return;
}
if (address.IsNull || address.Raw > (uint)MaxPage)
{
return;
}

// Fetch to L2 cache as we don't know if will need it
// So don't pollute L1 cache
Sse.Prefetch1((byte*)Ptr + address.FileOffset);
// Fetch to L2 cache as we don't know if will need it
// So don't pollute L1 cache
Sse.Prefetch1((byte*)Ptr + address.FileOffset);
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/Paprika/Store/PagedDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static PagedDb MemoryMappedDb(long size, byte historyDepth, string direct
new MemoryMappedPageManager(size, historyDepth, directory,
flushToDisk ? PersistenceOptions.FlushFile : PersistenceOptions.MMapOnly), historyDepth);

public void Prefetch(DbAddress address) => _manager.Prefetch(address);
public void Prefetch(ReadOnlySpan<DbAddress> addresses) => _manager.Prefetch(addresses);

private void ReportReads(long number) => _reads.Add(number);

Expand Down Expand Up @@ -506,7 +506,7 @@ public IDictionary<Keccak, uint> IdCache
}
}

public void Prefetch(DbAddress address) => db.Prefetch(address);
public void Prefetch(ReadOnlySpan<DbAddress> addresses) => db.Prefetch(addresses);

public Page GetAt(DbAddress address) => db._manager.GetAt(address);

Expand Down Expand Up @@ -678,7 +678,7 @@ public override Page GetAt(DbAddress address)
return page;
}

public override void Prefetch(DbAddress address) => _db.Prefetch(address);
public override void Prefetch(ReadOnlySpan<DbAddress> addresses) => _db.Prefetch(addresses);

public override DbAddress GetAddress(Page page) => _db.GetAddress(page);

Expand Down
Loading