diff --git a/src/Paprika.Tests/Store/BasePageTests.cs b/src/Paprika.Tests/Store/BasePageTests.cs index c0594da0..724c769c 100644 --- a/src/Paprika.Tests/Store/BasePageTests.cs +++ b/src/Paprika.Tests/Store/BasePageTests.cs @@ -26,9 +26,14 @@ internal class TestBatchContext(uint batchId, Stack? reusable = null) private uint _pageCount = 1U; public override Page GetAt(DbAddress address) => _address2Page[address]; + public override void Prefetch(DbAddress address, PrefetchMode mode) + { + + } - public override void Prefetch(DbAddress address) - { } + public override void Prefetch(ReadOnlySpan addresses) + { + } public override DbAddress GetAddress(Page page) => _page2Address[page.Raw]; @@ -107,4 +112,4 @@ public TestBatchContext Next() } internal static TestBatchContext NewBatch(uint batchId) => new(batchId); -} +} \ No newline at end of file diff --git a/src/Paprika/Platform.cs b/src/Paprika/Platform.cs new file mode 100644 index 00000000..b5ecd49f --- /dev/null +++ b/src/Paprika/Platform.cs @@ -0,0 +1,130 @@ +using System.ComponentModel; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +using AddressRange = (System.UIntPtr, uint); + +namespace Paprika; + +public static class Platform +{ + public static void Prefetch(ReadOnlySpan ranges) => Manager.SchedulePrefetch(ranges); + + private static readonly IMemoryManager Manager = + IsPosix() ? new PosixMemoryManager() : new WindowsMemoryManager(); + + private static bool IsPosix() => RuntimeInformation.IsOSPlatform(OSPlatform.Linux) || + RuntimeInformation.IsOSPlatform(OSPlatform.OSX); + + private sealed class PosixMemoryManager : IMemoryManager + { + [Flags] + private enum Advice : int + { + // ReSharper disable InconsistentNaming + /// + /// Expect access in the near future. (Hence, it might be a good idea to read some pages ahead.) + /// + MADV_WILLNEED = 0x3, + + /// + /// Do not expect access in the near future. + /// (For the time being, the application is finished with the given range, + /// so the kernel can free resources associated with it.) + /// + MADV_DONTNEED = 0x4, + // ReSharper restore InconsistentNaming + } + + [DllImport("LIBC_6", SetLastError = true)] + static extern int madvise(IntPtr addr, UIntPtr length, Advice advice); + + // For Linux + [DllImport("libc", SetLastError = true)] + private static extern IntPtr __errno_location(); + + // For macOS + [DllImport("libc", SetLastError = true)] + private static extern IntPtr __error(); + + public static int GetErrno() + { + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) + { + return Marshal.ReadInt32(__errno_location()); + } + + if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) + { + return Marshal.ReadInt32(__error()); + } + + throw new PlatformNotSupportedException("This platform is not supported."); + } + + public void SchedulePrefetch(ReadOnlySpan ranges) + { + const int success = 0; + + for (var i = 0; i < ranges.Length; i++) + { + var result = madvise((IntPtr)ranges[i].Item1, ranges[i].Item2, Advice.MADV_WILLNEED); + if (result != success) + { + throw new SystemException($"{nameof(madvise)} failed with the following error: {GetErrno()}"); + } + } + } + } + + private sealed class WindowsMemoryManager : IMemoryManager + { + [DllImport("kernel32.dll", SetLastError = true)] + private static extern unsafe bool PrefetchVirtualMemory(IntPtr hProcess, ulong numberOfEntries, + Win32MemoryRangeEntry* entries, int flags); + + [StructLayout(LayoutKind.Sequential)] + private struct Win32MemoryRangeEntry + { + /// + /// Starting address of the memory range + /// + public IntPtr VirtualAddress; + + /// + /// Size of the memory range in bytes + /// + public UIntPtr NumberOfBytes; + } + + [SkipLocalsInit] + public unsafe void SchedulePrefetch(ReadOnlySpan ranges) + { + var count = ranges.Length; + var ptr = stackalloc Win32MemoryRangeEntry[count]; + var span = new Span(ptr, count); + + for (var i = 0; i < span.Length; i++) + { + span[i].VirtualAddress = (IntPtr)ranges[i].Item1; + span[i].NumberOfBytes = ranges[i].Item2; + } + + const int reserved = 0; + + if (PrefetchVirtualMemory(Process.GetCurrentProcess().Handle, (ulong)count, ptr, reserved) == false) + { + throw new Win32Exception(Marshal.GetLastWin32Error()); + } + } + } + + private interface IMemoryManager + { + /// + /// Schedules an OS dependent prefetch. + /// + void SchedulePrefetch(ReadOnlySpan addresses); + } +} \ No newline at end of file diff --git a/src/Paprika/Store/AbandonedList.cs b/src/Paprika/Store/AbandonedList.cs index 6df5c11c..f0755858 100644 --- a/src/Paprika/Store/AbandonedList.cs +++ b/src/Paprika/Store/AbandonedList.cs @@ -131,6 +131,16 @@ public bool TryGet(out DbAddress reused, uint minBatchId, IBatchContext batch) if (current.TryPop(out reused)) { + // Schedule prefetching next if possible + if (current.TryPeek(out var next, out _)) + { + Debug.Assert(next.IsNull == false, "Next should not be NULL here"); + + // Expect that the page that will be prefetched here was not used for a while. + // It's ok to wait a bit to have it prefetched. + batch.Prefetch(next, PrefetchMode.Heavy); + } + return true; } diff --git a/src/Paprika/Store/BatchContextBase.cs b/src/Paprika/Store/BatchContextBase.cs index a7da7680..febbef9d 100644 --- a/src/Paprika/Store/BatchContextBase.cs +++ b/src/Paprika/Store/BatchContextBase.cs @@ -10,7 +10,9 @@ abstract class BatchContextBase(uint batchId) : IBatchContext public uint BatchId { get; } = batchId; public abstract Page GetAt(DbAddress address); - public abstract void Prefetch(DbAddress address); + public abstract void Prefetch(DbAddress address, PrefetchMode mode); + + public abstract void Prefetch(ReadOnlySpan addresses); public abstract DbAddress GetAddress(Page page); diff --git a/src/Paprika/Store/DataPage.cs b/src/Paprika/Store/DataPage.cs index f05dd3bc..d871939a 100644 --- a/src/Paprika/Store/DataPage.cs +++ b/src/Paprika/Store/DataPage.cs @@ -364,9 +364,10 @@ private static bool TryGet(IPageResolver batch, scoped in NibblePath key, out Re { // As the CPU does not auto-prefetch across page boundaries // Prefetch child page in case we go there next to reduce CPU stalls + // Do it using soft mode as quite likely the page is loaded to RAM but possibly not in CPU cache. bucket = dataPage.Data.Buckets[GetIndex(sliced)]; if (bucket.IsNull == false) - batch.Prefetch(bucket); + batch.Prefetch(bucket, PrefetchMode.Soft); } // try regular map diff --git a/src/Paprika/Store/IBatchContext.cs b/src/Paprika/Store/IBatchContext.cs index a45cc739..08502658 100644 --- a/src/Paprika/Store/IBatchContext.cs +++ b/src/Paprika/Store/IBatchContext.cs @@ -1,4 +1,5 @@ using System.Diagnostics; +using System.Runtime.CompilerServices; using Paprika.Crypto; namespace Paprika.Store; @@ -124,29 +125,44 @@ public interface IPageResolver /// Page GetAt(DbAddress address); - void Prefetch(DbAddress address); + /// + /// Issues a prefetch request for the page at the specific location + /// using mechanism defined by the . + /// + void Prefetch(DbAddress address, PrefetchMode mode); - void Prefetch(ReadOnlySpan addresses) - { - foreach (var bucket in addresses) - { - if (!bucket.IsNull) - { - Prefetch(bucket); - } - } - } + /// + /// Issues a prefetch request for a set of pages residing at . + /// The prefetch mode that is used is . + /// + void Prefetch(ReadOnlySpan addresses); + [SkipLocalsInit] void Prefetch(in TAddressList addresses) where TAddressList : struct, DbAddressList.IDbAddressList { + Span span = stackalloc DbAddress[TAddressList.Length]; + + // Copy all for (var i = 0; i < TAddressList.Length; i++) { - var addr = addresses[i]; - if (!addr.IsNull) - { - Prefetch(addr); - } + span[i] = addresses[i]; } + + Prefetch(span); } } + +public enum PrefetchMode +{ + /// + /// Expects that the page was not evicted and only should be brought to CPU case using SSE prefetch. + /// + Soft, + + /// + /// Expects that the page was not accessed lately or was evicted from the memory. + /// The page should be prefetched using platform specific heavy prefetch . + /// + Heavy, +} diff --git a/src/Paprika/Store/PageManagers/MemoryMappedPageManager.cs b/src/Paprika/Store/PageManagers/MemoryMappedPageManager.cs index c4d14914..958ef1c9 100644 --- a/src/Paprika/Store/PageManagers/MemoryMappedPageManager.cs +++ b/src/Paprika/Store/PageManagers/MemoryMappedPageManager.cs @@ -2,7 +2,9 @@ using System.Diagnostics; using System.Diagnostics.Metrics; using System.IO.MemoryMappedFiles; +using System.Runtime.CompilerServices; using System.Runtime.InteropServices; +using System.Threading.Channels; using Paprika.Utils; namespace Paprika.Store.PageManagers; @@ -12,8 +14,10 @@ public sealed class MemoryMappedPageManager : PointerPageManager private readonly PersistenceOptions _options; /// - /// The only option is random access. As Paprika jumps over the file, any prefetching is futile. - /// Also, the file cannot be async to use some of the mmap features. So here it is, random access file. + /// As Paprika jumps to various addresses in the file, using + /// would be harmful and is used. + /// + /// The file uses to issue proper async operations. /// private const FileOptions PaprikaFileOptions = FileOptions.RandomAccess | FileOptions.Asynchronous; @@ -24,6 +28,18 @@ public sealed class MemoryMappedPageManager : PointerPageManager private readonly MemoryMappedViewAccessor _whole; private readonly unsafe byte* _ptr; + // Prefetcher + private const int PrefetcherCapacity = 1000; + private readonly Channel _prefetches = Channel.CreateBounded(new BoundedChannelOptions(PrefetcherCapacity) + { + FullMode = BoundedChannelFullMode.DropOldest, + SingleReader = true, + SingleWriter = false, + Capacity = PrefetcherCapacity, + AllowSynchronousContinuations = false + }); + private readonly Task _prefetcher; + // Flusher section private readonly Stack _owners = new(); private readonly List _ownersUsed = new(); @@ -72,6 +88,8 @@ public unsafe MemoryMappedPageManager(long size, byte historyDepth, string dir, _meter = new Meter("Paprika.Store.PageManager"); _fileWrites = _meter.CreateHistogram("File writes", "Syscall", "Actual numbers of file writes issued"); _writeTime = _meter.CreateHistogram("Write time", "ms", "Time spent in writing"); + + _prefetcher = Task.Factory.StartNew(RunPrefetcher); } public static string GetPaprikaFilePath(string dir) => System.IO.Path.Combine(dir, PaprikaFileName); @@ -80,6 +98,63 @@ public unsafe MemoryMappedPageManager(long size, byte historyDepth, string dir, protected override unsafe void* Ptr => _ptr; + protected override void PrefetchHeavy(DbAddress address) + { + if (address.IsNull == false) + { + _prefetches.Writer.TryWrite(address); + } + } + + public override void Prefetch(ReadOnlySpan addresses) + { + var writer = _prefetches.Writer; + + foreach (var address in addresses) + { + if (address.IsNull == false) + { + writer.TryWrite(address); + } + } + } + + private async Task RunPrefetcher() + { + var reader = _prefetches.Reader; + + while (await reader.WaitToReadAsync()) + { + PrefetchImpl(reader, this); + } + + return; + + [SkipLocalsInit] + static void PrefetchImpl(ChannelReader reader, MemoryMappedPageManager manager) + { + const int maxPrefetch = 128; + + Span<(UIntPtr, uint)> span = stackalloc (UIntPtr, uint)[maxPrefetch]; + var i = 0; + + for (; i < maxPrefetch; i++) + { + if (reader.TryRead(out var address) == false) + break; + + span[i] = (manager.GetAt(address).Raw, Page.PageSize); + } + + if (i == 0) + return; + + // TODO: potentially sort and merge consecutive chunks + + Platform.Prefetch(span[..i]); + } + } + public override async ValueTask WritePages(ICollection dbAddresses, CommitOptions options) { if (_options == PersistenceOptions.MMapOnly) @@ -182,6 +257,9 @@ public override void ForceFlush() public override void Dispose() { + _prefetches.Writer.Complete(); + _prefetcher.Wait(); + _meter.Dispose(); _whole.SafeMemoryMappedViewHandle.ReleasePointer(); diff --git a/src/Paprika/Store/PageManagers/NativeMemoryPageManager.cs b/src/Paprika/Store/PageManagers/NativeMemoryPageManager.cs index 12258653..0377e701 100644 --- a/src/Paprika/Store/PageManagers/NativeMemoryPageManager.cs +++ b/src/Paprika/Store/PageManagers/NativeMemoryPageManager.cs @@ -19,6 +19,16 @@ public NativeMemoryPageManager(long size, byte historyDepth) : base(size) protected override void* Ptr => _ptr; + protected override void PrefetchHeavy(DbAddress address) => PrefetchSoft(address); + + public override void Prefetch(ReadOnlySpan addresses) + { + foreach (var address in addresses) + { + PrefetchSoft(address); + } + } + public override void Flush() { } diff --git a/src/Paprika/Store/PageManagers/PointerPageManager.cs b/src/Paprika/Store/PageManagers/PointerPageManager.cs index 37b09869..2fff5420 100644 --- a/src/Paprika/Store/PageManagers/PointerPageManager.cs +++ b/src/Paprika/Store/PageManagers/PointerPageManager.cs @@ -11,7 +11,23 @@ public abstract unsafe class PointerPageManager(long size) : IPageManager protected abstract void* Ptr { get; } - public void Prefetch(DbAddress address) + public void Prefetch(DbAddress address, PrefetchMode mode) + { + if (mode == PrefetchMode.Soft) + { + PrefetchSoft(address); + } + else + { + PrefetchHeavy(address); + } + } + + protected abstract void PrefetchHeavy(DbAddress address); + + public abstract void Prefetch(ReadOnlySpan addresses); + + protected void PrefetchSoft(DbAddress address) { if (Sse.IsSupported) { diff --git a/src/Paprika/Store/PagedDb.cs b/src/Paprika/Store/PagedDb.cs index eca16831..abb5505f 100644 --- a/src/Paprika/Store/PagedDb.cs +++ b/src/Paprika/Store/PagedDb.cs @@ -126,7 +126,9 @@ public static PagedDb MemoryMappedDb(long size, byte historyDepth, string direct new MemoryMappedPageManager(size, historyDepth, directory, flushToDisk ? PersistenceOptions.FlushFile : PersistenceOptions.MMapOnly), historyDepth); - public void Prefetch(DbAddress address) => _manager.Prefetch(address); + public void Prefetch(DbAddress address, PrefetchMode mode) => _manager.Prefetch(address, mode); + + public void Prefetch(ReadOnlySpan addresses) => _manager.Prefetch(addresses); private void ReportReads(long number) => _reads.Add(number); @@ -506,7 +508,9 @@ public IDictionary IdCache } } - public void Prefetch(DbAddress address) => db.Prefetch(address); + public void Prefetch(DbAddress address, PrefetchMode mode) => db.Prefetch(address, mode); + + public void Prefetch(ReadOnlySpan addresses) => db.Prefetch(addresses); public Page GetAt(DbAddress address) => db._manager.GetAt(address); @@ -676,7 +680,9 @@ public override Page GetAt(DbAddress address) return page; } - public override void Prefetch(DbAddress address) => _db.Prefetch(address); + public override void Prefetch(DbAddress address, PrefetchMode mode) => _db.Prefetch(address, mode); + + public override void Prefetch(ReadOnlySpan addresses) => _db.Prefetch(addresses); public override DbAddress GetAddress(Page page) => _db.GetAddress(page); diff --git a/src/Paprika/Store/StorageFanOut.cs b/src/Paprika/Store/StorageFanOut.cs index a07b5185..d5b57711 100644 --- a/src/Paprika/Store/StorageFanOut.cs +++ b/src/Paprika/Store/StorageFanOut.cs @@ -653,12 +653,12 @@ public void Accept(ref NibblePath.Builder builder, IPageVisitor visitor, IPageRe new BottomPage(root).Accept(ref builder, visitor, resolver, Root); } - public void Prefetch(IPageResolver resolver) + public void PrefetchForAccept(IPageResolver resolver) { if (Root.IsNull) return; - resolver.Prefetch(Root); + resolver.Prefetch(Root, PrefetchMode.Heavy); } } @@ -668,7 +668,7 @@ public void Accept(ref NibblePath.Builder builder, IPageVisitor visitor, IPageRe foreach (ref var bucket in Data.Buckets) { - bucket.Prefetch(resolver); + bucket.PrefetchForAccept(resolver); } foreach (ref var bucket in Data.Buckets)