Skip to content

Commit

Permalink
locked on write dictionary
Browse files Browse the repository at this point in the history
  • Loading branch information
Scooletz committed Dec 19, 2024
1 parent 1f5a8b7 commit a5c7d41
Showing 1 changed file with 42 additions and 16 deletions.
58 changes: 42 additions & 16 deletions src/Paprika/Chain/PooledSpanDictionary.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ private SearchResult TryGetImpl(scoped ReadOnlySpan<byte> key, uint leftover, ui
{
Debug.Assert(BitOperations.LeadingZeroCount(leftover) >= 11, "First 10 bits should be left unused");

var address = _root[(int)bucket];
ref var location = ref _root[(int)bucket];

var address = Volatile.Read(ref location);

if (address == UIntPtr.Zero) goto NotFound;

do
Expand Down Expand Up @@ -279,8 +282,6 @@ private void SetImpl(scoped ReadOnlySpan<byte> key, uint mixed, ReadOnlySpan<byt

Debug.Assert(BitOperations.LeadingZeroCount(leftover) >= 10, "First 10 bits should be left unused");

UIntPtr root = _root[(int)bucket];

var dataLength = data1.Length + data0.Length;

var size = PreambleLength + AddressLength + KeyLengthLength + key.Length + ValueLengthLength + dataLength;
Expand All @@ -291,9 +292,6 @@ private void SetImpl(scoped ReadOnlySpan<byte> key, uint mixed, ReadOnlySpan<byt
destination[1] = (byte)(leftover >> 8);
destination[2] = (byte)(leftover & 0xFF);

// Write next
Unsafe.WriteUnaligned(ref destination[PreambleLength], root);

// Key length
const int keyStart = PreambleLength + AddressLength;
destination[keyStart] = (byte)key.Length;
Expand All @@ -309,7 +307,23 @@ private void SetImpl(scoped ReadOnlySpan<byte> key, uint mixed, ReadOnlySpan<byt
data0.CopyTo(destination[(valueStart + ValueLengthLength)..]);
data1.CopyTo(destination[(valueStart + ValueLengthLength + data0.Length)..]);

_root[(int)bucket] = address;
// write root back to the bucket
ref var location = ref _root[(int)bucket];

UIntPtr root;
var current = Volatile.Read(ref location);

do
{
// copy to the root first
root = current;

// write next
Unsafe.WriteUnaligned(ref destination[PreambleLength], root);

// try swap remembering current
current = Interlocked.CompareExchange(ref location, address, root);
} while (current != root);
}

public void Destroy(scoped ReadOnlySpan<byte> key, ulong hash)
Expand Down Expand Up @@ -346,7 +360,7 @@ public bool MoveNext()
return false;
}

_address = dictionary._root[_bucket];
_address = Volatile.Read(ref dictionary._root[_bucket]);
}

// Scan the bucket till it's not destroyed
Expand Down Expand Up @@ -442,20 +456,32 @@ private Page RentNewPage(bool clear)

private static uint Mix(ulong hash) => unchecked((uint)((hash >> 32) ^ hash));

private readonly object _lock = new();

private Span<byte> Write(int size, out UIntPtr addr)
{
if (BufferSize - _position < size)
// Memoize to make contention as small as possible
Page current;
int position;

lock (_lock)
{
// not enough memory
AllocateNewPage();
if (BufferSize - _position < size)
{
// not enough memory
AllocateNewPage();
}

position = _position;
current = _current;

// Amend _position so that it can be used by other threads.
_position += size;
}

// allocated before the position is changed
var span = _current.Span.Slice(_position, size);
addr = _current.Raw + (UIntPtr)_position;

_position += size;
return span;
addr = current.Raw + (UIntPtr)position;
return current.Span.Slice(position, size);
}

public void Dispose()
Expand Down

0 comments on commit a5c7d41

Please sign in to comment.