Skip to content

Commit

Permalink
Execute blocking operations and context worker tasks should be concur…
Browse files Browse the repository at this point in the history
…rent.

Motivation:

Execute blocking operations use the context ordered task queue shared with context worker tasks. This prevents context execute blocking tasks to be executed concurrently with worker tasks, e.g. an execute blocking task prevents virtual thread context tasks to be executed.

Changes:

The context ordered task queue is now encapsulated in the worker event executor and not anymore part of the context. The context now creates an internal task queue for execute blocking tasks. As consequence worker contexts have dedicated task queue for context tasks / execute blocking ordered tasks. Event loop contexts still have a single task queue for execute blocking ordered tasks.
  • Loading branch information
vietj committed Oct 23, 2024
1 parent ba4c372 commit 775d945
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 43 deletions.
20 changes: 13 additions & 7 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ static <T> void setResultHandler(ContextInternal ctx, Future<T> fut, Handler<Asy
final TaskQueue internalOrderedTasks;
final WorkerPool internalWorkerPool;
final WorkerPool workerPool;
final WorkerTaskQueue orderedTasks;
final WorkerTaskQueue executeBlockingTasks;

public ContextImpl(VertxInternal vertx,
int localsLength,
Expand All @@ -65,7 +65,6 @@ public ContextImpl(VertxInternal vertx,
EventExecutor executor,
WorkerPool internalWorkerPool,
WorkerPool workerPool,
WorkerTaskQueue orderedTasks,
Deployment deployment,
CloseFuture closeFuture,
ClassLoader tccl) {
Expand All @@ -80,16 +79,23 @@ public ContextImpl(VertxInternal vertx,
this.workerPool = workerPool;
this.closeFuture = closeFuture;
this.internalWorkerPool = internalWorkerPool;
this.orderedTasks = orderedTasks;
this.executeBlockingTasks = new WorkerTaskQueue();
this.internalOrderedTasks = new TaskQueue();
}

public Future<Void> close() {
Future<Void> fut;
if (closeFuture == owner.closeFuture()) {
return Future.future(p -> orderedTasks.shutdown(eventLoop, p));
fut = Future.succeededFuture();
} else {
return closeFuture.close().eventually(() -> Future.<Void>future(p -> orderedTasks.shutdown(eventLoop, p)));
fut = closeFuture.close();
}
fut = fut.eventually(() -> Future.<Void>future(p -> executeBlockingTasks.shutdown(eventLoop, p)));
if (executor instanceof WorkerExecutor) {
WorkerExecutor workerExec = (WorkerExecutor) executor;
fut = fut.eventually(() -> Future.<Void>future(p -> workerExec.taskQueue().shutdown(eventLoop, p)));
}
return fut;
}

public Deployment getDeployment() {
Expand Down Expand Up @@ -136,12 +142,12 @@ public <T> Future<T> executeBlockingInternal(Callable<T> action, boolean ordered

@Override
public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered) {
return executeBlocking(this, blockingCodeHandler, workerPool, ordered ? orderedTasks : null);
return executeBlocking(this, blockingCodeHandler, workerPool, ordered ? executeBlockingTasks : null);
}

@Override
public <T> Future<T> executeBlocking(Callable<T> blockingCodeHandler, boolean ordered) {
return executeBlocking(this, blockingCodeHandler, workerPool, ordered ? orderedTasks : null);
return executeBlocking(this, blockingCodeHandler, workerPool, ordered ? executeBlockingTasks : null);
}

@Override
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/vertx/core/impl/DuplicatedContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.vertx.core.spi.tracing.VertxTracer;

import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;

Expand Down Expand Up @@ -136,12 +135,12 @@ public <T> Future<T> executeBlockingInternal(Callable<T> action, boolean ordered

@Override
public final <T> Future<T> executeBlocking(Handler<Promise<T>> action, boolean ordered) {
return ContextImpl.executeBlocking(this, action, delegate.workerPool, ordered ? delegate.orderedTasks : null);
return ContextImpl.executeBlocking(this, action, delegate.workerPool, ordered ? delegate.executeBlockingTasks : null);
}

@Override
public final <T> Future<T> executeBlocking(Callable<T> blockingCodeHandler, boolean ordered) {
return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, ordered ? delegate.orderedTasks : null);
return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, ordered ? delegate.executeBlockingTasks : null);
}

@Override
Expand Down
13 changes: 5 additions & 8 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -563,27 +563,24 @@ private ContextImpl createEventLoopContext(EventLoop eventLoop, CloseFuture clos
ThreadingModel threadingModel = ThreadingModel.EVENT_LOOP;
EventExecutor eventExecutor = new EventLoopExecutor(eventLoop);
WorkerPool wp = workerPool != null ? workerPool : this.workerPool;
WorkerTaskQueue taskQueue = new WorkerTaskQueue();
return createContext(threadingModel, eventLoop, closeFuture, deployment, tccl, eventExecutor, wp, taskQueue);
return createContext(threadingModel, eventLoop, closeFuture, deployment, tccl, eventExecutor, wp);
}

private ContextImpl createWorkerContext(EventLoop eventLoop, CloseFuture closeFuture, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) {
WorkerTaskQueue orderedTasks = new WorkerTaskQueue();
WorkerPool wp = workerPool != null ? workerPool : this.workerPool;
return createContext(ThreadingModel.WORKER, eventLoop, closeFuture, deployment, tccl, new WorkerExecutor(wp, orderedTasks), wp, orderedTasks);
return createContext(ThreadingModel.WORKER, eventLoop, closeFuture, deployment, tccl, new WorkerExecutor(wp, new WorkerTaskQueue()), wp);
}

private ContextImpl createVirtualThreadContext(EventLoop eventLoop, CloseFuture closeFuture, Deployment deployment, ClassLoader tccl) {
if (!isVirtualThreadAvailable()) {
throw new IllegalStateException("This Java runtime does not support virtual threads");
}
WorkerTaskQueue orderedTasks = new WorkerTaskQueue();
return createContext(ThreadingModel.VIRTUAL_THREAD, eventLoop, closeFuture, deployment, tccl, new WorkerExecutor(virtualThreaWorkerPool, orderedTasks), virtualThreaWorkerPool, orderedTasks);
return createContext(ThreadingModel.VIRTUAL_THREAD, eventLoop, closeFuture, deployment, tccl, new WorkerExecutor(virtualThreaWorkerPool, new WorkerTaskQueue()), virtualThreaWorkerPool);
}


private ContextImpl createContext(ThreadingModel threadingModel, EventLoop eventLoop, CloseFuture closeFuture, Deployment deployment, ClassLoader tccl, EventExecutor eventExecutor, WorkerPool wp, WorkerTaskQueue taskQueue) {
return new ContextImpl(this, contextLocalsLength, threadingModel, eventLoop, eventExecutor, internalWorkerPool, wp, taskQueue, deployment, closeFuture, disableTCCL ? null : tccl);
private ContextImpl createContext(ThreadingModel threadingModel, EventLoop eventLoop, CloseFuture closeFuture, Deployment deployment, ClassLoader tccl, EventExecutor eventExecutor, WorkerPool wp) {
return new ContextImpl(this, contextLocalsLength, threadingModel, eventLoop, eventExecutor, internalWorkerPool, wp, deployment, closeFuture, disableTCCL ? null : tccl);
}

@Override
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/io/vertx/core/impl/WorkerExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
package io.vertx.core.impl;

import io.vertx.core.Future;
import io.vertx.core.ThreadingModel;
import io.vertx.core.Vertx;
import io.vertx.core.spi.metrics.PoolMetrics;
Expand Down Expand Up @@ -43,10 +44,10 @@ public static io.vertx.core.impl.WorkerExecutor unwrapWorkerExecutor() {
}

private final WorkerPool workerPool;
private final TaskQueue orderedTasks;
private final WorkerTaskQueue orderedTasks;
private final ThreadLocal<Boolean> inThread = new ThreadLocal<>();

public WorkerExecutor(WorkerPool workerPool, TaskQueue orderedTasks) {
public WorkerExecutor(WorkerPool workerPool, WorkerTaskQueue orderedTasks) {
this.workerPool = workerPool;
this.orderedTasks = orderedTasks;
}
Expand Down Expand Up @@ -75,6 +76,10 @@ protected void execute() {
orderedTasks.execute(task, workerPool.executor());
}

WorkerTaskQueue taskQueue() {
return orderedTasks;
}

/**
* Suspend the current task execution until the task is resumed, the next task in the queue will be executed
* when there is one.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ public WorkerPool getPool() {
}
ContextInternal context = (ContextInternal) vertx.getOrCreateContext();
ContextImpl impl = context instanceof DuplicatedContext ? ((DuplicatedContext)context).delegate : (ContextImpl) context;
return ContextImpl.executeBlocking(context, blockingCodeHandler, pool, ordered ? impl.orderedTasks : null);
return ContextImpl.executeBlocking(context, blockingCodeHandler, pool, ordered ? impl.executeBlockingTasks : null);
}

@Override
public <T> Future<@Nullable T> executeBlocking(Callable<T> blockingCodeHandler, boolean ordered) {
ContextInternal context = vertx.getOrCreateContext();
ContextImpl impl = context instanceof DuplicatedContext ? ((DuplicatedContext)context).delegate : (ContextImpl) context;
return ContextImpl.executeBlocking(context, blockingCodeHandler, pool, ordered ? impl.orderedTasks : null);
return ContextImpl.executeBlocking(context, blockingCodeHandler, pool, ordered ? impl.executeBlockingTasks : null);
}

public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> asyncResultHandler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public static ContextInternal create(Vertx vertx) {
EXECUTOR,
impl.internalWorkerPool,
impl.workerPool,
new WorkerTaskQueue(),
null,
null,
Thread.currentThread().getContextClassLoader()
Expand Down
21 changes: 21 additions & 0 deletions src/test/java/io/vertx/core/ContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,27 @@ public void testInternalExecuteBlockingWithQueue(List<Consumer<Handler<Promise<O
await();
}

@Test
public void testExecuteBlockingUseItsOwnTaskQueue() {
Context ctx = ((VertxInternal)vertx).createWorkerContext();
CountDownLatch latch = new CountDownLatch(1);
ctx.runOnContext(v -> {
ctx.executeBlocking(() -> {
latch.countDown();
return 0;
});
boolean timedOut;
try {
timedOut = !latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertFalse(timedOut);
testComplete();
});
await();
}

@Test
public void testEventLoopContextDispatchReportsFailure() {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Expand Down
38 changes: 18 additions & 20 deletions src/test/java/io/vertx/core/NamedWorkerPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,27 +164,25 @@ public void testUnordered() throws Exception {
public void testUseDifferentExecutorWithSameTaskQueue() throws Exception {
int count = 10;
waitFor(count);
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() throws Exception {
WorkerExecutor exec = vertx.createSharedWorkerExecutor("vert.x-the-executor");
Thread startThread = Thread.currentThread();
AtomicReference<Thread> currentThread = new AtomicReference<>();
for (int i = 0;i < count;i++) {
int val = i;
exec.executeBlocking(fut -> {
Thread current = Thread.currentThread();
assertNotSame(startThread, current);
if (val == 0) {
assertNull(currentThread.getAndSet(current));
} else {
assertSame(current, currentThread.get());
}
fut.complete();
}, true, onSuccess(v -> complete()));
WorkerExecutor exec = vertx.createSharedWorkerExecutor("vert.x-the-executor");
Thread startThread = Thread.currentThread();
AtomicReference<Thread> currentThread = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0;i < count;i++) {
int val = i;
exec.executeBlocking(() -> {
Thread current = Thread.currentThread();
assertNotSame(startThread, current);
if (val == 0) {
assertNull(currentThread.getAndSet(current));
awaitLatch(latch);
} else {
assertSame(current, currentThread.get());
}
}
}, new DeploymentOptions().setWorker(true), onSuccess(id -> {}));
return null;
}, true).onComplete(onSuccess(v -> complete()));
latch.countDown();
}
await();
}

Expand Down

0 comments on commit 775d945

Please sign in to comment.