diff --git a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java index d0fc85b514..56ece5d733 100644 --- a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java +++ b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java @@ -28,6 +28,8 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.stream.Collectors; @@ -63,6 +65,7 @@ * @param Key type. * @param Value type. * @author Mark Paluch + * @author Tihomir Mateev * @since 3.0 */ @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -72,7 +75,7 @@ class PooledClusterConnectionProvider private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledClusterConnectionProvider.class); // Contains NodeId-identified and HostAndPort-identified connections. - private final Object stateLock = new Object(); + private final Lock stateLock = new ReentrantLock(); private final boolean debugEnabled = logger.isDebugEnabled(); @@ -156,8 +159,12 @@ public CompletableFuture> getConnectionAsync(Conne private CompletableFuture> getWriteConnection(int slot) { CompletableFuture> writer;// avoid races when reconfiguring partitions. - synchronized (stateLock) { + + stateLock.lock(); + try { writer = writers[slot]; + } finally { + stateLock.unlock(); } if (writer == null) { @@ -177,10 +184,13 @@ private CompletableFuture> getWriteConnection(int return future.thenApply(connection -> { - synchronized (stateLock) { + stateLock.lock(); + try { if (writers[slot] == null) { writers[slot] = CompletableFuture.completedFuture(connection); } + } finally { + stateLock.unlock(); } return connection; @@ -196,8 +206,11 @@ private CompletableFuture> getReadConnection(int s boolean cached = true; - synchronized (stateLock) { + stateLock.lock(); + try { readerCandidates = readers[slot]; + } finally { + stateLock.unlock(); } if (readerCandidates == null) { @@ -293,8 +306,12 @@ public Iterator iterator() { for (int i = 0; i < toCache.length; i++) { toCache[i] = CompletableFuture.completedFuture(statefulRedisConnections[i]); } - synchronized (stateLock) { + + stateLock.lock(); + try { readers[slot] = toCache; + } finally { + stateLock.unlock(); } if (!orderSensitive) { @@ -532,12 +549,15 @@ public void setPartitions(Partitions partitions) { boolean reconfigurePartitions = false; - synchronized (stateLock) { + stateLock.lock(); + try { if (this.partitions != null) { reconfigurePartitions = true; } this.partitions = partitions; this.connectionFactory.setPartitions(partitions); + } finally { + stateLock.unlock(); } if (reconfigurePartitions) { @@ -601,8 +621,11 @@ private boolean isStale(ConnectionKey connectionKey) { @Override public void setAutoFlushCommands(boolean autoFlush) { - synchronized (stateLock) { + stateLock.lock(); + try { this.autoFlushCommands = autoFlush; + } finally { + stateLock.unlock(); } connectionProvider.forEach(connection -> connection.setAutoFlushCommands(autoFlush)); @@ -616,9 +639,12 @@ public void flushCommands() { @Override public void setReadFrom(ReadFrom readFrom) { - synchronized (stateLock) { + stateLock.lock(); + try { this.readFrom = readFrom; Arrays.fill(readers, null); + } finally { + stateLock.unlock(); } } @@ -643,9 +669,12 @@ long getConnectionCount() { */ private void resetFastConnectionCache() { - synchronized (stateLock) { + stateLock.lock(); + try { Arrays.fill(writers, null); Arrays.fill(readers, null); + } finally { + stateLock.unlock(); } } @@ -719,9 +748,12 @@ public ConnectionFuture> apply(ConnectionKey key) RedisClusterNode actualNode = targetNode; connection = connection.thenApply(c -> { - synchronized (stateLock) { + stateLock.lock(); + try { c.setAutoFlushCommands(autoFlushCommands); c.addListener(message -> onPushMessage(actualNode, message)); + } finally { + stateLock.unlock(); } return c; }); diff --git a/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java b/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java index 944e7d4d3c..3f4da22af5 100644 --- a/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java +++ b/src/main/java/io/lettuce/core/cluster/models/partitions/Partitions.java @@ -24,6 +24,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.SlotHash; @@ -62,6 +64,8 @@ public class Partitions implements Collection { private static final RedisClusterNode[] EMPTY = new RedisClusterNode[SlotHash.SLOT_COUNT]; + private final Lock lock = new ReentrantLock(); + private final List partitions = new ArrayList<>(); private volatile RedisClusterNode[] slotCache = EMPTY; @@ -166,8 +170,8 @@ private static boolean matches(RedisURI uri, String host, int port) { */ public void updateCache() { - synchronized (partitions) { - + lock.lock(); + try { if (partitions.isEmpty()) { invalidateCache(); return; @@ -190,6 +194,8 @@ public void updateCache() { this.slotCache = slotCache; this.masterCache = masterCache; this.nodeReadView = Collections.unmodifiableCollection(readView); + } finally { + lock.unlock(); } } @@ -232,9 +238,12 @@ public void addPartition(RedisClusterNode partition) { LettuceAssert.notNull(partition, "Partition must not be null"); - synchronized (partitions) { + lock.lock(); + try { invalidateCache(); partitions.add(partition); + } finally { + lock.unlock(); } } @@ -265,10 +274,13 @@ public void reload(List partitions) { LettuceAssert.noNullElements(partitions, "Partitions must not contain null elements"); - synchronized (this.partitions) { + lock.lock(); + try { this.partitions.clear(); this.partitions.addAll(partitions); updateCache(); + } finally { + lock.unlock(); } } @@ -304,10 +316,13 @@ public boolean addAll(Collection c) { LettuceAssert.noNullElements(c, "Partitions must not contain null elements"); - synchronized (partitions) { + lock.lock(); + try { boolean b = partitions.addAll(c); updateCache(); return b; + } finally { + lock.unlock(); } } @@ -321,10 +336,13 @@ public boolean addAll(Collection c) { @Override public boolean removeAll(Collection c) { - synchronized (partitions) { + lock.lock(); + try { boolean b = getPartitions().removeAll(c); updateCache(); return b; + } finally { + lock.unlock(); } } @@ -339,10 +357,13 @@ public boolean removeAll(Collection c) { @Override public boolean retainAll(Collection c) { - synchronized (partitions) { + lock.lock(); + try { boolean b = getPartitions().retainAll(c); updateCache(); return b; + } finally { + lock.unlock(); } } @@ -352,9 +373,12 @@ public boolean retainAll(Collection c) { @Override public void clear() { - synchronized (partitions) { + lock.lock(); + try { getPartitions().clear(); updateCache(); + } finally { + lock.unlock(); } } @@ -390,12 +414,15 @@ public T[] toArray(T[] a) { @Override public boolean add(RedisClusterNode redisClusterNode) { - synchronized (partitions) { + lock.lock(); + try { LettuceAssert.notNull(redisClusterNode, "RedisClusterNode must not be null"); boolean add = getPartitions().add(redisClusterNode); updateCache(); return add; + } finally { + lock.unlock(); } } @@ -408,10 +435,13 @@ public boolean add(RedisClusterNode redisClusterNode) { @Override public boolean remove(Object o) { - synchronized (partitions) { + lock.lock(); + try { boolean remove = getPartitions().remove(o); updateCache(); return remove; + } finally { + lock.unlock(); } } diff --git a/src/main/java/io/lettuce/core/cluster/topology/DefaultClusterTopologyRefresh.java b/src/main/java/io/lettuce/core/cluster/topology/DefaultClusterTopologyRefresh.java index cc9ba5c4cb..e8106fefd6 100644 --- a/src/main/java/io/lettuce/core/cluster/topology/DefaultClusterTopologyRefresh.java +++ b/src/main/java/io/lettuce/core/cluster/topology/DefaultClusterTopologyRefresh.java @@ -506,7 +506,7 @@ public String getMessage() { } @Override - public synchronized Throwable fillInStackTrace() { + public Throwable fillInStackTrace() { return this; } diff --git a/src/main/java/io/lettuce/core/event/command/CommandBaseEvent.java b/src/main/java/io/lettuce/core/event/command/CommandBaseEvent.java index 9b0b3b527a..a189ddb626 100644 --- a/src/main/java/io/lettuce/core/event/command/CommandBaseEvent.java +++ b/src/main/java/io/lettuce/core/event/command/CommandBaseEvent.java @@ -32,9 +32,7 @@ public RedisCommand getCommand() { * @return shared context. */ public Map getContext() { - synchronized (this) { - return context; - } + return context; } @Override diff --git a/src/main/java/io/lettuce/core/event/jfr/JfrEventRecorder.java b/src/main/java/io/lettuce/core/event/jfr/JfrEventRecorder.java index 620e546d54..e380951345 100644 --- a/src/main/java/io/lettuce/core/event/jfr/JfrEventRecorder.java +++ b/src/main/java/io/lettuce/core/event/jfr/JfrEventRecorder.java @@ -3,6 +3,7 @@ import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import io.lettuce.core.event.Event; import io.lettuce.core.internal.LettuceAssert; @@ -23,7 +24,7 @@ */ class JfrEventRecorder implements EventRecorder { - private final Map, Constructor> constructorMap = new HashMap<>(); + private final Map, Constructor> constructorMap = new ConcurrentHashMap<>(); @Override public void record(Event event) { @@ -54,11 +55,7 @@ public RecordableEvent start(Event event) { private Constructor getEventConstructor(Event event) throws NoSuchMethodException { - Constructor constructor; - - synchronized (constructorMap) { - constructor = constructorMap.get(event.getClass()); - } + Constructor constructor = constructorMap.get(event.getClass()); if (constructor == null) { @@ -73,9 +70,7 @@ private Constructor getEventConstructor(Event event) throws NoSuchMethodExcep constructor.setAccessible(true); } - synchronized (constructorMap) { - constructorMap.put(event.getClass(), constructor); - } + constructorMap.put(event.getClass(), constructor); } return constructor; diff --git a/src/main/java/io/lettuce/core/internal/AbstractInvocationHandler.java b/src/main/java/io/lettuce/core/internal/AbstractInvocationHandler.java index 2eb72488ad..d5e97fc5a0 100644 --- a/src/main/java/io/lettuce/core/internal/AbstractInvocationHandler.java +++ b/src/main/java/io/lettuce/core/internal/AbstractInvocationHandler.java @@ -24,6 +24,8 @@ import java.lang.reflect.Modifier; import java.lang.reflect.Proxy; import java.util.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * Abstract base class for invocation handlers. @@ -161,6 +163,8 @@ protected static class MethodTranslator { private static final WeakHashMap, MethodTranslator> TRANSLATOR_MAP = new WeakHashMap<>(32); + private static final Lock lock = new ReentrantLock(); + private final Map map; private MethodTranslator(Class delegate, Class... methodSources) { @@ -170,8 +174,11 @@ private MethodTranslator(Class delegate, Class... methodSources) { public static MethodTranslator of(Class delegate, Class... methodSources) { - synchronized (TRANSLATOR_MAP) { + lock.lock(); + try { return TRANSLATOR_MAP.computeIfAbsent(delegate, key -> new MethodTranslator(key, methodSources)); + } finally { + lock.unlock(); } } diff --git a/src/main/java/io/lettuce/core/json/UnproccessedJsonValue.java b/src/main/java/io/lettuce/core/json/UnproccessedJsonValue.java index cfeda343b1..0a00167503 100644 --- a/src/main/java/io/lettuce/core/json/UnproccessedJsonValue.java +++ b/src/main/java/io/lettuce/core/json/UnproccessedJsonValue.java @@ -10,6 +10,8 @@ import io.lettuce.core.codec.StringCodec; import java.nio.ByteBuffer; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * A wrapper around any of the implementations of the {@link JsonValue} provided by the implementation of the {@link JsonParser} @@ -22,6 +24,8 @@ */ class UnproccessedJsonValue implements JsonValue { + private final Lock lock = new ReentrantLock(); + private volatile JsonValue jsonValue; private final JsonParser parser; @@ -45,7 +49,8 @@ public String toString() { return jsonValue.toString(); } - synchronized (this) { + lock.lock(); + try { if (isDeserialized()) { return jsonValue.toString(); } @@ -53,6 +58,8 @@ public String toString() { // if no deserialization took place, so no modification took place // in this case we can decode the source data as is return StringCodec.UTF8.decodeValue(unprocessedData); + } finally { + lock.unlock(); } } @@ -62,7 +69,8 @@ public ByteBuffer asByteBuffer() { return jsonValue.asByteBuffer(); } - synchronized (this) { + lock.lock(); + try { if (isDeserialized()) { return jsonValue.asByteBuffer(); } @@ -70,6 +78,8 @@ public ByteBuffer asByteBuffer() { // if no deserialization took place, so no modification took place // in this case we can decode the source data as is return unprocessedData; + } finally { + lock.unlock(); } } @@ -147,11 +157,14 @@ public T toObject(Class targetType) { private void lazilyDeserialize() { if (!isDeserialized()) { - synchronized (this) { + lock.lock(); + try { if (!isDeserialized()) { jsonValue = parser.createJsonValue(unprocessedData); unprocessedData.clear(); } + } finally { + lock.unlock(); } } } diff --git a/src/main/java/io/lettuce/core/masterreplica/MasterReplicaConnectionProvider.java b/src/main/java/io/lettuce/core/masterreplica/MasterReplicaConnectionProvider.java index 4b2732679a..685541980e 100644 --- a/src/main/java/io/lettuce/core/masterreplica/MasterReplicaConnectionProvider.java +++ b/src/main/java/io/lettuce/core/masterreplica/MasterReplicaConnectionProvider.java @@ -13,6 +13,8 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import reactor.core.publisher.Flux; @@ -54,7 +56,7 @@ class MasterReplicaConnectionProvider { private boolean autoFlushCommands = true; - private final Object stateLock = new Object(); + private final Lock stateLock = new ReentrantLock(); private ReadFrom readFrom; @@ -249,9 +251,12 @@ public void flushCommands() { */ public void setAutoFlushCommands(boolean autoFlush) { - synchronized (stateLock) { + stateLock.lock(); + try { this.autoFlushCommands = autoFlush; connectionProvider.forEach(connection -> connection.setAutoFlushCommands(autoFlush)); + } finally { + stateLock.unlock(); } } @@ -270,12 +275,15 @@ protected Collection> allConnections() { * @param knownNodes */ public void setKnownNodes(Collection knownNodes) { - synchronized (stateLock) { + stateLock.lock(); + try { this.knownNodes.clear(); this.knownNodes.addAll(knownNodes); closeStaleConnections(); + } finally { + stateLock.unlock(); } } @@ -283,14 +291,20 @@ public void setKnownNodes(Collection knownNodes) { * @return the current read-from setting. */ public ReadFrom getReadFrom() { - synchronized (stateLock) { + stateLock.lock(); + try { return readFrom; + } finally { + stateLock.unlock(); } } public void setReadFrom(ReadFrom readFrom) { - synchronized (stateLock) { + stateLock.lock(); + try { this.readFrom = readFrom; + } finally { + stateLock.unlock(); } } @@ -325,8 +339,11 @@ public ConnectionFuture> apply(ConnectionKey key) builder.build()); connectionFuture.thenAccept(connection -> { - synchronized (stateLock) { + stateLock.lock(); + try { connection.setAutoFlushCommands(autoFlushCommands); + } finally { + stateLock.unlock(); } }); diff --git a/src/main/java/io/lettuce/core/metrics/DefaultCommandLatencyCollector.java b/src/main/java/io/lettuce/core/metrics/DefaultCommandLatencyCollector.java index e4f71f1578..86109ab869 100644 --- a/src/main/java/io/lettuce/core/metrics/DefaultCommandLatencyCollector.java +++ b/src/main/java/io/lettuce/core/metrics/DefaultCommandLatencyCollector.java @@ -339,19 +339,19 @@ private NoPauseDetector() { } @Override - protected synchronized void notifyListeners(long pauseLengthNsec, long pauseEndTimeNsec) { + protected void notifyListeners(long pauseLengthNsec, long pauseEndTimeNsec) { } @Override - public synchronized void addListener(PauseDetectorListener listener) { + public void addListener(PauseDetectorListener listener) { } @Override - public synchronized void addListener(PauseDetectorListener listener, boolean isHighPriority) { + public void addListener(PauseDetectorListener listener, boolean isHighPriority) { } @Override - public synchronized void removeListener(PauseDetectorListener listener) { + public void removeListener(PauseDetectorListener listener) { } @Override diff --git a/src/main/java/io/lettuce/core/support/ConnectionPoolSupport.java b/src/main/java/io/lettuce/core/support/ConnectionPoolSupport.java index a882be567f..f3ab8cb85d 100644 --- a/src/main/java/io/lettuce/core/support/ConnectionPoolSupport.java +++ b/src/main/java/io/lettuce/core/support/ConnectionPoolSupport.java @@ -4,6 +4,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import org.apache.commons.pool2.BasePooledObjectFactory; @@ -157,20 +159,31 @@ public void returnObject(T obj) { SoftReferenceObjectPool pool = new SoftReferenceObjectPool(new RedisPooledObjectFactory<>(connectionSupplier)) { + private final Lock lock = new ReentrantLock(); + @Override - public synchronized T borrowObject() throws Exception { - return wrapConnections ? ConnectionWrapping.wrapConnection(super.borrowObject(), poolRef.get()) - : super.borrowObject(); + public T borrowObject() throws Exception { + lock.lock(); + try { + return wrapConnections ? ConnectionWrapping.wrapConnection(super.borrowObject(), poolRef.get()) + : super.borrowObject(); + } finally { + lock.unlock(); + } } @Override - public synchronized void returnObject(T obj) throws Exception { - - if (wrapConnections && obj instanceof HasTargetConnection) { - super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection()); - return; + public void returnObject(T obj) throws Exception { + lock.lock(); + try { + if (wrapConnections && obj instanceof HasTargetConnection) { + super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection()); + return; + } + super.returnObject(obj); + } finally { + lock.unlock(); } - super.returnObject(obj); } }; diff --git a/src/test/java/io/lettuce/test/KeyValueStreamingAdapter.java b/src/test/java/io/lettuce/test/KeyValueStreamingAdapter.java index 3dc99cea58..8a4e531572 100644 --- a/src/test/java/io/lettuce/test/KeyValueStreamingAdapter.java +++ b/src/test/java/io/lettuce/test/KeyValueStreamingAdapter.java @@ -2,6 +2,7 @@ import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; import io.lettuce.core.output.KeyStreamingChannel; import io.lettuce.core.output.KeyValueStreamingChannel; @@ -18,11 +19,16 @@ public class KeyValueStreamingAdapter implements KeyValueStreamingChannel< private final Map map = new LinkedHashMap<>(); + private final ReentrantLock lock = new ReentrantLock(); + @Override public void onKeyValue(K key, V value) { - synchronized (map) { + lock.lock(); + try { map.put(key, value); + } finally { + lock.unlock(); } }