diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java index 09c842b9e28..cc4cd23a3c4 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java @@ -110,7 +110,7 @@ public class ProgramNotificationSubscriberService extends AbstractIdleService { private final TransactionRunner transactionRunner; private final Store store; private final RunRecordMonitorService runRecordMonitorService; - private final Service delegate; + private Service delegate; private Set programCompletionNotifiers; @Inject @@ -134,6 +134,10 @@ public class ProgramNotificationSubscriberService extends AbstractIdleService { this.store = store; this.runRecordMonitorService = runRecordMonitorService; this.programCompletionNotifiers = Collections.emptySet(); + } + + @Override + protected void startUp() throws Exception { List children = new ArrayList<>(); String topicPrefix = cConf.get(Constants.AppFabric.PROGRAM_STATUS_EVENT_TOPIC); int numPartitions = cConf.getInt(Constants.AppFabric.PROGRAM_STATUS_EVENT_NUM_PARTITIONS); @@ -145,10 +149,6 @@ public class ProgramNotificationSubscriberService extends AbstractIdleService { i -> children.add(createChildService("program.status." + i, topicPrefix + i))); } this.delegate = new CompositeService(children); - } - - @Override - protected void startUp() throws Exception { delegate.startAndWait(); }