From d73e7bb1dbc12bc46ebb41a6ec610aa285383dc0 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Thu, 21 Nov 2024 12:41:58 +0000 Subject: [PATCH 1/3] Unbalanced Parallel (#436) * Unbalanced Parallel * Missed one * Avoid closures * Use Volatile * Use base class * Use ManualResetEventSlim * Use SemaphoreSlim * Add padding --------- Co-authored-by: lukasz.rozmej --- src/Paprika/Data/BitMapFilter.cs | 19 +- src/Paprika/Merkle/ComputeMerkleBehavior.cs | 6 +- src/Paprika/Utils/ParallelUnbalancedWork.cs | 306 ++++++++++++++++++++ 3 files changed, 321 insertions(+), 10 deletions(-) create mode 100644 src/Paprika/Utils/ParallelUnbalancedWork.cs diff --git a/src/Paprika/Data/BitMapFilter.cs b/src/Paprika/Data/BitMapFilter.cs index 8f3dd1d7..6c6128b6 100644 --- a/src/Paprika/Data/BitMapFilter.cs +++ b/src/Paprika/Data/BitMapFilter.cs @@ -6,6 +6,7 @@ using System.Runtime.Intrinsics.X86; using Paprika.Chain; using Paprika.Store; +using Paprika.Utils; namespace Paprika.Data; @@ -182,12 +183,12 @@ public void OrWith(in OfN other) [Pure] public void OrWith(OfN[] others) { - var pages = _pages; + State state = new State(others, _pages); - Parallel.For(0, PageCount, i => + ParallelUnbalancedWork.For(0, PageCount, state, static (i, s) => { - var page = pages[i]; - var length = others.Length; + var page = s.Pages[i]; + var length = s.Others.Length; for (var j = 0; j < length - 1; j++) { @@ -196,17 +197,21 @@ public void OrWith(OfN[] others) // prefetch next unsafe { - Sse.Prefetch2(others[j + 1]._pages[i].Payload); + Sse.Prefetch2(s.Others[j + 1]._pages[i].Payload); } } - page.OrWith(others[j]._pages[i]); + page.OrWith(s.Others[j]._pages[i]); } - page.OrWith(others[length - 1]._pages[i]); + page.OrWith(s.Others[length - 1]._pages[i]); + + return s; }); } + private readonly record struct State(OfN[] Others, Page[] Pages); + public int BucketCount => Page.PageSize * BitsPerByte * PageCount; private static int PageCount => TSize.Count; diff --git a/src/Paprika/Merkle/ComputeMerkleBehavior.cs b/src/Paprika/Merkle/ComputeMerkleBehavior.cs index acf6cbff..b15d1324 100644 --- a/src/Paprika/Merkle/ComputeMerkleBehavior.cs +++ b/src/Paprika/Merkle/ComputeMerkleBehavior.cs @@ -261,9 +261,9 @@ private static void ScatterGather(ICommit commit, BuildStorageTriesItem[] workIt } var children = new ConcurrentQueue(); - Parallel.For(0, workItems.Length, s_parallelOptions, + ParallelUnbalancedWork.For(0, workItems.Length, s_parallelOptions, commit.GetChild, - (i, _, child) => + (i, child) => { workItems[i].DoWork(child); return child; @@ -605,7 +605,7 @@ private void EncodeBranch(scoped in Key key, scoped in ComputeContext ctx, scope var budget = ctx.Budget; // parallel calculation - Parallel.For((long)0, NibbleSet.NibbleCount, s_parallelOptions, nibble => + ParallelUnbalancedWork.For(0, NibbleSet.NibbleCount, s_parallelOptions, nibble => { var childPath = NibblePath.Single((byte)nibble, 0); diff --git a/src/Paprika/Utils/ParallelUnbalancedWork.cs b/src/Paprika/Utils/ParallelUnbalancedWork.cs new file mode 100644 index 00000000..298e5244 --- /dev/null +++ b/src/Paprika/Utils/ParallelUnbalancedWork.cs @@ -0,0 +1,306 @@ +using System.Runtime.InteropServices; + +namespace Paprika.Utils; + +/// +/// Provides methods to execute parallel loops efficiently for unbalanced workloads. +/// +public class ParallelUnbalancedWork : IThreadPoolWorkItem +{ + private static readonly ParallelOptions s_parallelOptions = new() + { + // default to the number of processors + MaxDegreeOfParallelism = Environment.ProcessorCount + }; + + private readonly Data _data; + + /// + /// Executes a parallel for loop over a range of integers. + /// + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// The delegate that is invoked once per iteration. + public static void For(int fromInclusive, int toExclusive, Action action) + => For(fromInclusive, toExclusive, s_parallelOptions, action); + + /// + /// Executes a parallel for loop over a range of integers, with the specified options. + /// + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// An object that configures the behavior of this operation. + /// The delegate that is invoked once per iteration. + public static void For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action action) + { + int threads = parallelOptions.MaxDegreeOfParallelism > 0 ? parallelOptions.MaxDegreeOfParallelism : Environment.ProcessorCount; + + Data data = new(threads, fromInclusive, toExclusive, action); + + for (int i = 0; i < threads - 1; i++) + { + ThreadPool.UnsafeQueueUserWorkItem(new ParallelUnbalancedWork(data), preferLocal: false); + } + + new ParallelUnbalancedWork(data).Execute(); + + // If there are still active threads, wait for them to complete + if (data.ActiveThreads > 0) + { + data.Event.Wait(); + } + } + + /// + /// Executes a parallel for loop over a range of integers, with thread-local data, initialization, and finalization functions. + /// + /// The type of the thread-local data. + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// An object that configures the behavior of this operation. + /// The function to initialize the local data for each thread. + /// The delegate that is invoked once per iteration. + /// The function to finalize the local data for each thread. + public static void For( + int fromInclusive, + int toExclusive, + ParallelOptions parallelOptions, + Func init, + Func action, + Action @finally) + => InitProcessor.For(fromInclusive, toExclusive, parallelOptions, init, default, action, @finally); + + /// + /// Executes a parallel for loop over a range of integers, with thread-local data. + /// + /// The type of the thread-local data. + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// The initial state of the thread-local data. + /// The delegate that is invoked once per iteration. + public static void For(int fromInclusive, int toExclusive, TLocal state, Func action) + => For(fromInclusive, toExclusive, s_parallelOptions, state, action); + + /// + /// Executes a parallel for loop over a range of integers, with thread-local data and specified options. + /// + /// The type of the thread-local data. + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// An object that configures the behavior of this operation. + /// The initial state of the thread-local data. + /// The delegate that is invoked once per iteration. + public static void For( + int fromInclusive, + int toExclusive, + ParallelOptions parallelOptions, + TLocal state, + Func action) + => InitProcessor.For(fromInclusive, toExclusive, parallelOptions, null, state, action); + + /// + /// Initializes a new instance of the class. + /// + /// The shared data for the parallel work. + private ParallelUnbalancedWork(Data data) + { + _data = data; + } + + /// + /// Executes the parallel work item. + /// + public void Execute() + { + int i = _data.Index.GetNext(); + while (i < _data.ToExclusive) + { + _data.Action(i); + // Get the next index + i = _data.Index.GetNext(); + } + + // Signal that this thread has completed its work + _data.MarkThreadCompleted(); + } + + /// + /// Provides a thread-safe counter for sharing indices among threads. + /// + private class SharedCounter(int fromInclusive) + { + private PaddedValue _index = new(fromInclusive); + + /// + /// Gets the next index in a thread-safe manner. + /// + /// The next index. + public int GetNext() => Interlocked.Increment(ref _index.Value) - 1; + + [StructLayout(LayoutKind.Explicit, Size = 128)] + private struct PaddedValue(int value) + { + [FieldOffset(64)] + public int Value = value; + } + } + + /// + /// Represents the base data shared among threads during parallel execution. + /// + private class BaseData(int threads, int fromInclusive, int toExclusive) + { + /// + /// Gets the shared counter for indices. + /// + public SharedCounter Index { get; } = new SharedCounter(fromInclusive); + public SemaphoreSlim Event { get; } = new(initialCount: 0); + + /// + /// Gets the exclusive upper bound of the range. + /// + public int ToExclusive => toExclusive; + + /// + /// Gets the number of active threads. + /// + public int ActiveThreads => Volatile.Read(ref threads); + + /// + /// Marks a thread as completed. + /// + /// The number of remaining active threads. + public int MarkThreadCompleted() + { + var remaining = Interlocked.Decrement(ref threads); + + if (remaining == 0) + { + Event.Release(); + } + + return remaining; + } + } + + /// + /// Represents the data shared among threads for the parallel action. + /// + private class Data(int threads, int fromInclusive, int toExclusive, Action action) : + BaseData(threads, fromInclusive, toExclusive) + { + /// + /// Gets the action to be executed for each iteration. + /// + public Action Action => action; + } + + /// + /// Provides methods to execute parallel loops with thread-local data initialization and finalization. + /// + /// The type of the thread-local data. + private class InitProcessor : IThreadPoolWorkItem + { + private readonly Data _data; + + /// + /// Executes a parallel for loop over a range of integers, with thread-local data initialization and finalization. + /// + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// An object that configures the behavior of this operation. + /// The function to initialize the local data for each thread. + /// The initial value of the local data. + /// The delegate that is invoked once per iteration. + /// The function to finalize the local data for each thread. + public static void For( + int fromInclusive, + int toExclusive, + ParallelOptions parallelOptions, + Func? init, + TLocal? initValue, + Func action, + Action? @finally = null) + { + // Determine the number of threads to use + var threads = parallelOptions.MaxDegreeOfParallelism > 0 + ? parallelOptions.MaxDegreeOfParallelism + : Environment.ProcessorCount; + + // Create shared data with thread-local initializers and finalizers + var data = new Data(threads, fromInclusive, toExclusive, action, init, initValue, @finally); + + // Queue work items to the thread pool for all threads except the current one + for (int i = 0; i < threads - 1; i++) + { + ThreadPool.UnsafeQueueUserWorkItem(new InitProcessor(data), preferLocal: false); + } + + // Execute work on the current thread + new InitProcessor(data).Execute(); + + // If there are still active threads, wait for them to complete + if (data.ActiveThreads > 0) + { + data.Event.Wait(); + } + } + + /// + /// Initializes a new instance of the class. + /// + /// The shared data for the parallel work. + private InitProcessor(Data data) => _data = data; + + /// + /// Executes the parallel work item with thread-local data. + /// + public void Execute() + { + TLocal? value = _data.Init(); + int i = _data.Index.GetNext(); + while (i < _data.ToExclusive) + { + value = _data.Action(i, value); + i = _data.Index.GetNext(); + } + + _data.Finally(value); + + _data.MarkThreadCompleted(); + } + + /// + /// Represents the data shared among threads for the parallel action with thread-local data. + /// + /// The type of the thread-local data. + private class Data(int threads, + int fromInclusive, + int toExclusive, + Func action, + Func? init = null, + TValue? initValue = default, + Action? @finally = null) : BaseData(threads, fromInclusive, toExclusive) + { + /// + /// Gets the action to be executed for each iteration. + /// + public Func Action => action; + + /// + /// Initializes the thread-local data. + /// + /// The initialized thread-local data. + public TValue Init() => initValue ?? (init is not null ? init.Invoke() : default)!; + + /// + /// Finalizes the thread-local data. + /// + /// The thread-local data to finalize. + public void Finally(TValue value) + { + @finally?.Invoke(value); + } + } + } +} From 88b30db593418ed89847e34aabb237683a1da97f Mon Sep 17 00:00:00 2001 From: Szymon Kulec Date: Thu, 21 Nov 2024 14:29:23 +0100 Subject: [PATCH 2/3] BitFilter clearing made parallel (#440) --- src/Paprika/Data/BitMapFilter.cs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/Paprika/Data/BitMapFilter.cs b/src/Paprika/Data/BitMapFilter.cs index 6c6128b6..228cf62c 100644 --- a/src/Paprika/Data/BitMapFilter.cs +++ b/src/Paprika/Data/BitMapFilter.cs @@ -26,10 +26,12 @@ public static BitMapFilter> CreateOfN(BufferPool pool) var pages = new Page[TSize.Count]; for (var i = 0; i < TSize.Count; i++) { - pages[i] = pool.Rent(true); + pages[i] = pool.Rent(false); } - return new BitMapFilter>(new OfN(pages)); + var accessor = new OfN(pages); + accessor.Clear(); + return new BitMapFilter>(accessor); } public interface IAccessor @@ -151,10 +153,11 @@ public unsafe ref int GetSlot(uint hash) public void Clear() { - foreach (var page in _pages) + ParallelUnbalancedWork.For(0, PageCount, _pages, static (i, state) => { - page.Clear(); - } + state[i].Clear(); + return state; + }); } public void Return(BufferPool pool) From 3eff92a76dbfa566b01d7697c831416f0cefea0f Mon Sep 17 00:00:00 2001 From: Cypher Pepe <125112044+cypherpepe@users.noreply.github.com> Date: Tue, 26 Nov 2024 12:51:02 +0300 Subject: [PATCH 3/3] typos design.md (#441) --- docs/design.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/design.md b/docs/design.md index f1991226..39e2f382 100644 --- a/docs/design.md +++ b/docs/design.md @@ -46,7 +46,7 @@ blockchain.Finalize(Block2A); ### PagedDb -The `PagedDb` component is responsible for storing the left-fold of the blocks that are beyond the cut-off point. This database uses [memory-mapped files](https://en.wikipedia.org/wiki/Memory-mapped_file) to provide storing capabilities. To handle concurrency, [Copy on Write](https://en.wikipedia.org/wiki/Copy-on-write) is used. This allows multiple concurrent readers to cooperate in a full lock-free manner and a single writer that runs the current transaction. In that manner, it's heavily inspired by [LMBD](https://github.com/LMDB/lmdb). +The `PagedDb` component is responsible for storing the left-fold of the blocks that are beyond the cut-off point. This database uses [memory-mapped files](https://en.wikipedia.org/wiki/Memory-mapped_file) to provide storing capabilities. To handle concurrency, [Copy on Write](https://en.wikipedia.org/wiki/Copy-on-write) is used. This allows multiple concurrent readers to cooperate in a full lock-free manner and a single writer that runs the current transaction. In that manner, it's heavily inspired by [LMDB](https://github.com/LMDB/lmdb). It's worth to mention that due to the design of the `Blockchain` component, having a single writer available is sufficient. At the same time, having multiple readers allow to create readonly transactions that are later used by blocks from `Blockchain`. @@ -365,6 +365,6 @@ A few remarks: 1. Database Storage lectures by Andy Pavlo from CMU Intro to Database Systems / Fall 2022: 1. Database Storage, pt. 1 https://www.youtube.com/watch?v=df-l2PxUidI 1. Database Storage, pt. 2 https://www.youtube.com/watch?v=2HtfGdsrwqA -1. LMBD +1. LMDB 1. Howard Chu - LMDB [The Databaseology Lectures - CMU Fall 2015](https://www.youtube.com/watch?v=tEa5sAh-kVk) 1. The main file of LMDB [mdb.c](https://github.com/LMDB/lmdb/blob/mdb.master/libraries/liblmdb/mdb.c)