Skip to content

Commit

Permalink
Remove Synchronized Statements(Loom-friendly) (#173)
Browse files Browse the repository at this point in the history
Motivation:
Synchronized statements tend to pin virtual threads to carrier threads.

Modification:
Replaced synchronized statements with ReentrantLock

Result:
Loom-friendly
  • Loading branch information
jchrys authored Dec 22, 2023
1 parent d4a9fe0 commit cf3a302
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;
Expand Down Expand Up @@ -157,6 +158,8 @@ private static final class LazyQueryCache {

private final int capacity;

private final ReentrantLock lock = new ReentrantLock();

@Nullable
private volatile QueryCache cache;

Expand All @@ -167,11 +170,14 @@ private LazyQueryCache(int capacity) {
public QueryCache get() {
QueryCache cache = this.cache;
if (cache == null) {
synchronized (this) {
lock.lock();
try {
if ((cache = this.cache) == null) {
this.cache = cache = Caches.createQueryCache(capacity);
}
return cache;
} finally {
lock.unlock();
}
}
return cache;
Expand Down
51 changes: 32 additions & 19 deletions src/main/java/io/asyncer/r2dbc/mysql/cache/PrepareBoundedCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package io.asyncer.r2dbc.mysql.cache;

import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntConsumer;

/**
* A bounded implementation of {@link PrepareCache} that uses synchronized methods to ensure correctness, even
* A bounded implementation of {@link PrepareCache} that uses {@link ReentrantLock} to ensure correctness, even
* it should not be used thread concurrently.
*/
final class PrepareBoundedCache extends HashMap<String, Lru.Node<Integer>> implements PrepareCache {
Expand All @@ -33,6 +34,8 @@ final class PrepareBoundedCache extends HashMap<String, Lru.Node<Integer>> imple

private final Lru<Integer> protection;

private final ReentrantLock lock = new ReentrantLock();

PrepareBoundedCache(int capacity) {
int windowSize = Math.max(1, capacity / 100);
int protectionSize = Math.max(1, (int) ((capacity - windowSize) * 0.8));
Expand All @@ -45,29 +48,39 @@ final class PrepareBoundedCache extends HashMap<String, Lru.Node<Integer>> imple
}

@Override
public synchronized Integer getIfPresent(String key) {
Lru.Node<Integer> node = super.get(key);

if (node == null) {
return null;
public Integer getIfPresent(String key) {
lock.lock();
try {
Lru.Node<Integer> node = super.get(key);

if (node == null) {
return null;
}

drainRead(node);
return node.getValue();
} finally {
lock.unlock();
}

drainRead(node);
return node.getValue();
}

@Override
public synchronized boolean putIfAbsent(String key, int value, IntConsumer evict) {
Lru.Node<Integer> wantAdd = new Lru.Node<>(key, value);
Lru.Node<Integer> present = super.putIfAbsent(key, wantAdd);

if (present == null) {
drainAdded(wantAdd, evict);
return true;
public boolean putIfAbsent(String key, int value, IntConsumer evict) {
lock.lock();
try {
Lru.Node<Integer> wantAdd = new Lru.Node<>(key, value);
Lru.Node<Integer> present = super.putIfAbsent(key, wantAdd);

if (present == null) {
drainAdded(wantAdd, evict);
return true;
}

drainRead(present);
return false;
} finally {
lock.unlock();
}

drainRead(present);
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;

abstract class LeftPadding {

Expand Down Expand Up @@ -57,6 +58,8 @@ final class RequestQueue extends ActiveStatus implements Runnable {

private final Queue<RequestTask<?>> queue = Queues.<RequestTask<?>>small().get();

private final ReentrantLock lock = new ReentrantLock();

@Nullable
private volatile RuntimeException disposed;

Expand Down Expand Up @@ -145,14 +148,17 @@ private RuntimeException requireDisposed() {
RuntimeException disposed = this.disposed;

if (disposed == null) {
synchronized (this) {
lock.lock();
try {
disposed = this.disposed;

if (disposed == null) {
this.disposed = disposed = new IllegalStateException("Request queue was disposed");
}

return disposed;
} finally {
lock.unlock();
}
}

Expand Down
20 changes: 16 additions & 4 deletions src/main/java/io/asyncer/r2dbc/mysql/codec/DefaultCodecs.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;

Expand Down Expand Up @@ -319,16 +320,19 @@ static final class Builder implements CodecsBuilder {

private final ByteBufAllocator allocator;

@GuardedBy("this")
@GuardedBy("lock")
private final ArrayList<Codec<?>> codecs = new ArrayList<>();

private final ReentrantLock lock = new ReentrantLock();

Builder(ByteBufAllocator allocator) {
this.allocator = allocator;
}

@Override
public CodecsBuilder addFirst(Codec<?> codec) {
synchronized (this) {
lock.lock();
try {
if (codecs.isEmpty()) {
Codec<?>[] defaultCodecs = defaultCodecs(allocator);

Expand All @@ -339,24 +343,30 @@ public CodecsBuilder addFirst(Codec<?> codec) {
} else {
codecs.add(0, codec);
}
} finally {
lock.unlock();
}
return this;
}

@Override
public CodecsBuilder addLast(Codec<?> codec) {
synchronized (this) {
lock.lock();
try {
if (codecs.isEmpty()) {
codecs.addAll(InternalArrays.asImmutableList(defaultCodecs(allocator)));
}
codecs.add(codec);
} finally {
lock.unlock();
}
return this;
}

@Override
public Codecs build() {
synchronized (this) {
lock.lock();
try {
try {
if (codecs.isEmpty()) {
return new DefaultCodecs(defaultCodecs(allocator));
Expand All @@ -366,6 +376,8 @@ public Codecs build() {
codecs.clear();
codecs.trimToSize();
}
} finally {
lock.unlock();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package io.asyncer.r2dbc.mysql.collation;

import org.jetbrains.annotations.Nullable;

import java.nio.charset.Charset;
import java.util.concurrent.locks.ReentrantLock;

/**
* Character collation those NOT use cached {@link CharsetTarget} of MySQL, it will be initialized and cached
Expand All @@ -26,6 +29,9 @@
*/
final class LazyInitCharCollation extends AbstractCharCollation {

private final ReentrantLock lock = new ReentrantLock();

@Nullable
private volatile Charset cached;

LazyInitCharCollation(int id, String name, CharsetTarget target) {
Expand All @@ -37,7 +43,8 @@ public Charset getCharset() {
Charset cached = this.cached;

if (cached == null) {
synchronized (this) {
lock.lock();
try {
cached = this.cached;

if (cached == null) {
Expand All @@ -46,6 +53,8 @@ public Charset getCharset() {
}

return cached;
} finally {
lock.unlock();
}
}

Expand Down

0 comments on commit cf3a302

Please sign in to comment.