Skip to content

Commit

Permalink
Fix wrapping executable service multiple times (#11490)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstepanov authored Jan 8, 2025
1 parent 1ec7414 commit 67eb48d
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,47 +45,48 @@ final class ExecutorServiceInstrumenter implements BeanCreatedEventListener<Exec
*/
@Override
public ExecutorService onCreated(BeanCreatedEvent<ExecutorService> event) {
Class<ExecutorService> beanType = event.getBeanDefinition().getBeanType();
if (beanType == ExecutorService.class) {
ExecutorService executorService = event.getBean();
if (executorService instanceof ScheduledExecutorService service) {
return new InstrumentedScheduledExecutorService() {
@Override
public ScheduledExecutorService getTarget() {
return service;
}
Class<? extends ExecutorService> beanType = event.getBeanDefinition().getBeanType();
if (beanType != ExecutorService.class) {
return event.getBean();
}
ExecutorService executorService = event.getBean();
if (executorService instanceof InstrumentedExecutorService) {
return executorService;
}
if (executorService instanceof ScheduledExecutorService service) {
return new InstrumentedScheduledExecutorService() {
@Override
public ScheduledExecutorService getTarget() {
return service;
}

@Override
public <T> Callable<T> instrument(Callable<T> task) {
return PropagatedContext.wrapCurrent(task);
}
@Override
public <T> Callable<T> instrument(Callable<T> task) {
return PropagatedContext.wrapCurrent(task);
}

@Override
public Runnable instrument(Runnable command) {
return PropagatedContext.wrapCurrent(command);
}
};
} else {
return new InstrumentedExecutorService() {
@Override
public ExecutorService getTarget() {
return executorService;
}
@Override
public Runnable instrument(Runnable command) {
return PropagatedContext.wrapCurrent(command);
}
};
}
return new InstrumentedExecutorService() {
@Override
public ExecutorService getTarget() {
return executorService;
}

@Override
public <T> Callable<T> instrument(Callable<T> task) {
return PropagatedContext.wrapCurrent(task);
}
@Override
public <T> Callable<T> instrument(Callable<T> task) {
return PropagatedContext.wrapCurrent(task);
}

@Override
public Runnable instrument(Runnable command) {
return PropagatedContext.wrapCurrent(command);
}
};
@Override
public Runnable instrument(Runnable command) {
return PropagatedContext.wrapCurrent(command);
}
} else {
return event.getBean();
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.micronaut.context

import io.micronaut.core.propagation.PropagatedContext
import io.micronaut.core.propagation.ThreadPropagatedContextElement
import io.micronaut.inject.qualifiers.Qualifiers
import spock.lang.Requires
import spock.lang.Specification
import spock.util.environment.Jvm

import java.util.concurrent.ExecutorService
import java.util.concurrent.atomic.AtomicInteger

@Requires({ Jvm.current.isJava21Compatible() })
class PropagatedContext2Spec extends Specification {

void 'test PropagatedContext are correctly called for ExecutorServices io, virtual and blocking'() {
given:
ApplicationContext applicationContext = ApplicationContext.run()
ExecutorService io = applicationContext.getBean(ExecutorService, Qualifiers.byName("io"))
ExecutorService virtual = applicationContext.getBean(ExecutorService, Qualifiers.byName("virtual"))
ExecutorService blocking = applicationContext.getBean(ExecutorService, Qualifiers.byName("blocking"))

and:
TestPropagatedContext contextForIo = new TestPropagatedContext("io")
TestPropagatedContext contextForVirtual = new TestPropagatedContext("virtual")
TestPropagatedContext contextForBlocking = new TestPropagatedContext("blocking")

when:
println("---------")
println("Running IO ExecutorService:")
try (PropagatedContext.Scope ignored = PropagatedContext.getOrEmpty().plus(contextForIo).propagate()) {
io.submit {
println("Executing IO Thread Service")
}.get()
}

println("---------")
println("Running Virtual ExecutorService:")
try (PropagatedContext.Scope ignored = PropagatedContext.getOrEmpty().plus(contextForVirtual).propagate()) {
virtual.submit {
println("Executing Virtual Thread Service")
}.get()
}

println("---------")
println("Running Blocking ExecutorService:")
try (PropagatedContext.Scope ignored = PropagatedContext.getOrEmpty().plus(contextForBlocking).propagate()) {
blocking.submit {
println("Executing Blocking Thread Service")
}.get()
}

then: "Should be called 1x on the propagate() method and 1x by the ExecutorServiceInstrumenter"
contextForIo.state() == 1

and: "Should be called 1x on the propagate() method and 1x by the ExecutorServiceInstrumenter"
contextForVirtual.state() == 1

and: "Should be called 1x on the propagate() method and 1x by the ExecutorServiceInstrumenter"
contextForBlocking.state() == 1

cleanup:
applicationContext.stop()
}

class TestPropagatedContext implements ThreadPropagatedContextElement<Integer> {

private final String name
private AtomicInteger counter = new AtomicInteger(0)

TestPropagatedContext(String name) {
this.name = name
}

Integer updateThreadContext() {
int value = counter.incrementAndGet();
println("Updating thread context for $name: $value")
return value;
}

void restoreThreadContext(Integer oldState) {
println("Restoring thread context for $name: $oldState")
}

Integer state() {
return this.counter.get()
}
}

}

0 comments on commit 67eb48d

Please sign in to comment.