Skip to content

Commit

Permalink
[PLAT-16436] Refactor platform scheduler to remove duplicate code
Browse files Browse the repository at this point in the history
Summary: Minor refactoring to reuse code.

Test Plan: Schedulers are running fine with the change.

Reviewers: cwang, amalyshev

Reviewed By: amalyshev

Subscribers: yugaware

Differential Revision: https://phorge.dev.yugabyte.com/D41106
  • Loading branch information
nkhogen committed Jan 10, 2025
1 parent a0424e1 commit dcf06f9
Showing 1 changed file with 47 additions and 60 deletions.
107 changes: 47 additions & 60 deletions managed/src/main/java/com/yugabyte/yw/common/PlatformScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.yugabyte.yw.models.HighAvailabilityConfig;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import javax.inject.Inject;
import javax.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -31,45 +32,39 @@ public PlatformScheduler(
this.shutdownHookHandler = shutdownHookHandler;
}

public Cancellable schedule(
String name, Duration initialDelay, Duration interval, Runnable runnable) {
private Cancellable createShutdownAwareSchedule(
String name, Runnable runnable, Function<Runnable, Cancellable> scheduleFactory) {
final AtomicBoolean isRunning = new AtomicBoolean();
final Object lock = new Object();

Cancellable cancellable =
actorSystem
.scheduler()
.scheduleWithFixedDelay(
initialDelay,
interval,
() -> {
boolean shouldRun = false;
synchronized (lock) {
// Synchronized block in shutdown and this should be serialized.
shouldRun =
!shutdownHookHandler.isShutdown()
&& !HighAvailabilityConfig.isFollower()
&& isRunning.compareAndSet(false, true);
}
if (shouldRun) {
try {
runnable.run();
} finally {
isRunning.set(false);
if (shutdownHookHandler.isShutdown()) {
synchronized (lock) {
lock.notify();
}
}
}
} else {
log.warn(
"Previous run of scheduler {} is in progress, is being shut down, or YBA is"
+ " in follower mode.",
name);
}
},
executionContext);
Runnable wrappedRunnable =
() -> {
boolean shouldRun = false;
synchronized (lock) {
// Synchronized block in shutdown and this should be serialized.
shouldRun =
!shutdownHookHandler.isShutdown()
&& !HighAvailabilityConfig.isFollower()
&& isRunning.compareAndSet(false, true);
}
if (shouldRun) {
try {
runnable.run();
} finally {
isRunning.set(false);
if (shutdownHookHandler.isShutdown()) {
synchronized (lock) {
lock.notify();
}
}
}
} else {
log.warn(
"Previous run of scheduler {} is in progress, is being shut down, or YBA is in"
+ " follower mode.",
name);
}
};
Cancellable cancellable = scheduleFactory.apply(wrappedRunnable);
shutdownHookHandler.addShutdownHook(
cancellable,
can -> {
Expand All @@ -93,29 +88,21 @@ public Cancellable schedule(
return cancellable;
}

public Cancellable scheduleOnce(String name, Duration initialDelay, Runnable runnable) {
final AtomicBoolean isRunning = new AtomicBoolean();
public Cancellable schedule(
String name, Duration initialDelay, Duration interval, Runnable runnable) {
return createShutdownAwareSchedule(
name,
runnable,
r ->
actorSystem
.scheduler()
.scheduleWithFixedDelay(initialDelay, interval, r, executionContext));
}

return actorSystem
.scheduler()
.scheduleOnce(
initialDelay,
() -> {
boolean shouldRun = false;
synchronized (isRunning) {
shouldRun =
isRunning.compareAndSet(false, true) && !HighAvailabilityConfig.isFollower();
}
if (shouldRun) {
try {
runnable.run();
} finally {
isRunning.set(false);
}
} else {
log.warn("Scheduler {} did not run because YBA is in follower mode.", name);
}
},
executionContext);
public Cancellable scheduleOnce(String name, Duration initialDelay, Runnable runnable) {
return createShutdownAwareSchedule(
name,
runnable,
r -> actorSystem.scheduler().scheduleOnce(initialDelay, r, executionContext));
}
}

0 comments on commit dcf06f9

Please sign in to comment.