From 1e004c4ede513185c24a7f227b21f583fbebdfa0 Mon Sep 17 00:00:00 2001 From: Naorem Khogendro Singh Date: Wed, 15 Jan 2025 14:36:13 -0800 Subject: [PATCH] [PLAT-16468] Both instances can go into standby if sync happens from the demoted peer right after promotion Summary: This fixes the original issue + more robustness + cleanup to remove deprecated methods as much as possible. Test Plan: 1. Modified code to run sync right after demotion. Demotion works fine. 2. Shutdown the active. Make the standby active forcefully. Start the old active YBA. After the next sync in the background,the old YBA transitions to standby. Will be testing more scenarios. This diff is deployed on https://10.9.121.30/ and https://10.9.89.26/ Reviewers: muthu, nbhatia, dshubin Reviewed By: muthu Subscribers: yugaware Differential Revision: https://phorge.dev.yugabyte.com/D41266 --- .../yw/common/CustomerTaskManager.java | 6 +- .../common/ha/PlatformReplicationHelper.java | 154 ++++--- .../common/ha/PlatformReplicationManager.java | 413 +++++++++++++----- .../yugabyte/yw/common/utils/FileUtils.java | 5 + .../yugabyte/yw/controllers/HAController.java | 62 +-- .../yw/controllers/InternalHAController.java | 134 ++---- .../PlatformInstanceController.java | 47 +- .../PlatformReplicationController.java | 56 ++- .../yw/models/HighAvailabilityConfig.java | 9 +- .../yugabyte/yw/models/PlatformInstance.java | 6 +- .../ha/PlatformReplicationManagerTest.java | 6 +- .../yw/controllers/HAControllerTest.java | 7 +- .../controllers/InternalHAControllerTest.java | 22 +- .../PlatformInstanceControllerTest.java | 31 +- .../yw/models/HighAvailabilityConfigTest.java | 14 +- 15 files changed, 598 insertions(+), 374 deletions(-) diff --git a/managed/src/main/java/com/yugabyte/yw/common/CustomerTaskManager.java b/managed/src/main/java/com/yugabyte/yw/common/CustomerTaskManager.java index d93ee2809b6d..42bb2ef8d87d 100644 --- a/managed/src/main/java/com/yugabyte/yw/common/CustomerTaskManager.java +++ b/managed/src/main/java/com/yugabyte/yw/common/CustomerTaskManager.java @@ -51,6 +51,7 @@ import com.yugabyte.yw.models.Customer; import com.yugabyte.yw.models.CustomerTask; import com.yugabyte.yw.models.CustomerTask.TargetType; +import com.yugabyte.yw.models.HighAvailabilityConfig; import com.yugabyte.yw.models.PendingConsistencyCheck; import com.yugabyte.yw.models.Restore; import com.yugabyte.yw.models.RestoreKeyspace; @@ -358,7 +359,6 @@ public void handleAllPendingTasks() { CustomerTask customerTask = CustomerTask.get(row.getLong("customer_task_id")); handlePendingTask(customerTask, taskInfo); }); - for (Customer customer : Customer.getAll()) { // Change the DeleteInProgress backups state to QueuedForDeletion Backup.findAllBackupWithState( @@ -460,6 +460,10 @@ private void enableLoadBalancer(Universe universe) { } public void handleAutoRetryAbortedTasks() { + if (HighAvailabilityConfig.isFollower()) { + LOG.info("Skipping auto-retry of tasks because this YBA is a follower"); + return; + } Duration timeWindow = confGetter.getGlobalConf(GlobalConfKeys.autoRetryTasksOnYbaRestartTimeWindow); if (timeWindow.isZero()) { diff --git a/managed/src/main/java/com/yugabyte/yw/common/ha/PlatformReplicationHelper.java b/managed/src/main/java/com/yugabyte/yw/common/ha/PlatformReplicationHelper.java index 8981e8f4ea41..957fc063b4f4 100644 --- a/managed/src/main/java/com/yugabyte/yw/common/ha/PlatformReplicationHelper.java +++ b/managed/src/main/java/com/yugabyte/yw/common/ha/PlatformReplicationHelper.java @@ -17,7 +17,12 @@ import com.typesafe.config.ConfigException; import com.typesafe.config.ConfigValue; import com.typesafe.config.ConfigValueFactory; -import com.yugabyte.yw.common.*; +import com.yugabyte.yw.common.AppConfigHelper; +import com.yugabyte.yw.common.PrometheusConfigHelper; +import com.yugabyte.yw.common.PrometheusConfigManager; +import com.yugabyte.yw.common.ShellProcessHandler; +import com.yugabyte.yw.common.ShellResponse; +import com.yugabyte.yw.common.SwamperHelper; import com.yugabyte.yw.common.config.GlobalConfKeys; import com.yugabyte.yw.common.config.RuntimeConfGetter; import com.yugabyte.yw.common.config.impl.SettableRuntimeConfigFactory; @@ -32,6 +37,7 @@ import java.io.IOException; import java.net.URI; import java.net.URL; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; @@ -41,23 +47,23 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import lombok.extern.slf4j.Slf4j; import org.apache.pekko.actor.Cancellable; import org.apache.velocity.Template; import org.apache.velocity.VelocityContext; import org.apache.velocity.app.VelocityEngine; import org.apache.velocity.runtime.RuntimeConstants; import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import play.libs.Json; @Singleton +@Slf4j public class PlatformReplicationHelper { - private static final Logger LOG = LoggerFactory.getLogger(PlatformReplicationHelper.class); public static final String BACKUP_DIR = "platformBackups"; public static final String REPLICATION_DIR = "platformReplication"; static final String BACKUP_FILE_PATTERN = "backup_*.tgz"; + static final String LOCAL_HA_CONFIG_JSON_FILE = "local_ha_config.json"; // Config keys: private static final String REPLICATION_SCHEDULE_ENABLED_KEY = @@ -78,8 +84,6 @@ public class PlatformReplicationHelper { private final SettableRuntimeConfigFactory runtimeConfigFactory; - private final ApiHelper apiHelper; - private final PlatformInstanceClientFactory remoteClientFactory; private final MetricUrlProvider metricUrlProvider; @@ -94,7 +98,6 @@ public class PlatformReplicationHelper { public PlatformReplicationHelper( RuntimeConfGetter confGetter, SettableRuntimeConfigFactory runtimeConfigFactory, - ApiHelper apiHelper, PlatformInstanceClientFactory remoteClientFactory, ShellProcessHandler shellProcessHandler, MetricUrlProvider metricUrlProvider, @@ -102,7 +105,6 @@ public PlatformReplicationHelper( PrometheusConfigManager prometheusConfigManager) { this.confGetter = confGetter; this.runtimeConfigFactory = runtimeConfigFactory; - this.apiHelper = apiHelper; this.remoteClientFactory = remoteClientFactory; this.shellProcessHandler = shellProcessHandler; this.metricUrlProvider = metricUrlProvider; @@ -238,40 +240,43 @@ private void writeFederatedPrometheusConfig(String remoteAddr, File file, boolea // Merge the template with the context. template.merge(context, writer); } catch (Exception e) { - LOG.error("Error creating federated prometheus config file"); + log.error("Error creating federated prometheus config file"); } } + // This makes calls to the remote instances to demote. boolean demoteRemoteInstance(PlatformInstance remoteInstance, boolean promote) { + if (remoteInstance.getIsLocal()) { + log.warn("Cannot perform demoteRemoteInstance action on a local instance"); + return false; + } HighAvailabilityConfig config = remoteInstance.getConfig(); try (PlatformInstanceClient client = this.remoteClientFactory.getClient( config.getClusterKey(), remoteInstance.getAddress(), config.getAcceptAnyCertificateOverrides())) { - if (remoteInstance.getIsLocal()) { - LOG.warn("Cannot perform demoteRemoteInstance action on a local instance"); - return false; + if (remoteInstance.getIsLeader()) { + // Ensure all local records for remote instances are set to follower state. + remoteInstance.demote(); } - - // Ensure all local records for remote instances are set to follower state. - remoteInstance.demote(); - return config .getLocal() .map( localInstance -> { // Send step down request to remote instance. + log.info( + "Demoting remote instance {} in favor of {}", + remoteInstance.getAddress(), + localInstance.getAddress()); client.demoteInstance( localInstance.getAddress(), config.getLastFailover().getTime(), promote); - return true; }) .orElse(false); } catch (Exception e) { - LOG.error("Error demoting remote platform instance {}", remoteInstance.getAddress(), e); + log.error("Error demoting remote platform instance {}", remoteInstance.getAddress(), e); } - return false; } @@ -300,7 +305,7 @@ boolean exportPlatformInstances(HighAvailabilityConfig config, String remoteInst client.syncInstances(config.getLastFailover().getTime(), instancesJson); return true; } catch (Exception e) { - LOG.error( + log.error( "Error exporting local platform instances to remote instance " + remoteInstanceAddr, e); } return false; @@ -308,19 +313,19 @@ boolean exportPlatformInstances(HighAvailabilityConfig config, String remoteInst void switchPrometheusToFederated(URL remoteAddr) { try { - LOG.info("Switching local prometheus to federated or updating it"); + log.info("Switching local prometheus to federated or updating it"); File configFile = prometheusConfigHelper.getPrometheusConfigFile(); File configDir = configFile.getParentFile(); File previousConfigFile = new File(configDir, "previous_prometheus.yml"); if (!configDir.exists() && !configDir.mkdirs()) { - LOG.warn("Could not create output dir {}", configDir); + log.warn("Could not create output dir {}", configDir); return; } // Move the old file if it hasn't already been moved. if (configFile.exists() && !previousConfigFile.exists()) { - LOG.info("Creating previous_prometheus.yml from existing prometheus.yml"); + log.info("Creating previous_prometheus.yml from existing prometheus.yml"); FileUtils.moveFile(configFile.toPath(), previousConfigFile.toPath()); } @@ -333,18 +338,18 @@ void switchPrometheusToFederated(URL remoteAddr) { String federatedPoint = remoteAddr.getHost() + ":" + federatedURL.getPort(); boolean https = federatedURL.getScheme().equalsIgnoreCase("https"); this.writeFederatedPrometheusConfig(federatedPoint, configFile, https); - LOG.info("Wrote federated prometheus config."); + log.info("Wrote federated prometheus config."); // Reload the config. prometheusConfigHelper.reloadPrometheusConfig(); } catch (Exception e) { - LOG.error("Error switching prometheus config to read from {}", remoteAddr.getHost(), e); + log.error("Error switching prometheus config to read from {}", remoteAddr.getHost(), e); } } void switchPrometheusToStandalone() { try { - LOG.info("Switching prometheus to standalone."); + log.info("Switching prometheus to standalone."); File configFile = prometheusConfigHelper.getPrometheusConfigFile(); File configDir = configFile.getParentFile(); File previousConfigFile = new File(configDir, "previous_prometheus.yml"); @@ -356,9 +361,9 @@ void switchPrometheusToStandalone() { FileUtils.moveFile(previousConfigFile.toPath(), configFile.toPath()); prometheusConfigHelper.reloadPrometheusConfig(); prometheusConfigManager.updateK8sScrapeConfigs(); - LOG.info("Moved previous_prometheus.yml to prometheus.yml"); + log.info("Moved previous_prometheus.yml to prometheus.yml"); } catch (Exception e) { - LOG.error("Error switching prometheus config to standalone", e); + log.error("Error switching prometheus config to standalone", e); } } @@ -439,7 +444,7 @@ void cleanupBackups(List backups, int numToRetain) { return; } - LOG.info("Garbage collecting {} backups", numBackups - numToRetain); + log.info("Garbage collecting {} backups", numBackups - numToRetain); backups.subList(0, numBackups - numToRetain).forEach(File::delete); } @@ -448,7 +453,7 @@ public Optional getMostRecentBackup() { return FileUtils.listFiles(this.getBackupDir(), BACKUP_FILE_PATTERN).stream() .max(Comparator.comparingLong(File::lastModified)); } catch (Exception exception) { - LOG.error("Could not locate recent backup", exception); + log.error("Could not locate recent backup", exception); } return Optional.empty(); @@ -459,19 +464,19 @@ public void cleanupCreatedBackups() { List backups = FileUtils.listFiles(this.getBackupDir(), BACKUP_FILE_PATTERN); // Keep 3 most recent backups to avoid interference between continuous backups and HA this.cleanupBackups(backups, 3); - } catch (IOException ioException) { - LOG.warn("Failed to list or delete backups"); + } catch (IOException e) { + log.warn("Failed to list or delete backups", e); } } boolean syncToRemoteInstance(PlatformInstance remoteInstance) { HighAvailabilityConfig config = remoteInstance.getConfig(); String remoteAddr = remoteInstance.getAddress(); - LOG.debug("Syncing data to " + remoteAddr + "..."); + log.debug("Syncing data to {}...", remoteAddr); // Ensure that the remote instance is demoted if this instance is the most current leader. if (!this.demoteRemoteInstance(remoteInstance, false)) { - LOG.error("Error demoting remote instance " + remoteAddr); + log.error("Error demoting remote instance {}", remoteAddr); return false; } @@ -484,61 +489,72 @@ List listBackups(URL leader) { Path backupDir = this.getReplicationDirFor(leader.getHost()); if (!backupDir.toFile().exists() || !backupDir.toFile().isDirectory()) { - LOG.debug(String.format("%s directory does not exist", backupDir.toFile().getName())); - - return new ArrayList<>(); + log.debug(String.format("%s directory does not exist", backupDir.toFile().getName())); + return new ArrayList<>(1); } return FileUtils.listFiles(backupDir, PlatformReplicationHelper.BACKUP_FILE_PATTERN); } catch (Exception e) { - LOG.error("Error listing backups for platform instance {}", leader.getHost(), e); - + log.error("Error listing backups for platform instance {}", leader.getHost(), e); return new ArrayList<>(); } } - void cleanupReceivedBackups(URL leader, int numToRetain) { - List backups = this.listBackups(leader); - this.cleanupBackups(backups, numToRetain); - } - - Optional processImportedInstance(PlatformInstance i) { - Optional config = HighAvailabilityConfig.get(); - Optional existingInstance = PlatformInstance.getByAddress(i.getAddress()); - if (config.isPresent()) { - // Ensure the previous leader is marked as a follower to avoid uniqueness violation. - if (i.getIsLeader()) { - Optional existingLeader = config.get().getLeader(); - if (existingLeader.isPresent() - && !existingLeader.get().getAddress().equals(i.getAddress())) { - existingLeader.get().demote(); - } + // Save the HA config to a file before it is wiped out in backup restore. + Path saveLocalHighAvailabilityConfig(HighAvailabilityConfig config) { + Path localConfigDir = getReplicationDirFor("localhost"); + Path localHaConfigPath = getReplicationDirFor("localhost").resolve(LOCAL_HA_CONFIG_JSON_FILE); + try { + localConfigDir.toFile().mkdirs(); + File file = localConfigDir.resolve(LOCAL_HA_CONFIG_JSON_FILE).toFile(); + if (file.exists()) { + file.delete(); } + Json.mapper().writeValue(file, config); + return localHaConfigPath; + } catch (Exception e) { + log.error("Failed to write local HA config to file {}", localHaConfigPath, e); + throw new RuntimeException(e); + } + } - if (existingInstance.isPresent()) { - // Since we sync instances after sending backups, the leader instance has the source of - // truth as to when the last backup has been successfully sent to followers. - existingInstance.get().setLastBackup(i.getLastBackup()); - existingInstance.get().setIsLeader(i.getIsLeader()); - existingInstance.get().update(); - i = existingInstance.get(); - } else { - i.setIsLocal(false); - i.setConfig(config.get()); - i.save(); + // Read the HA config from the file. + Optional maybeGetLocalHighAvailabilityConfig() { + File localHaConfigFile = + getReplicationDirFor("localhost").resolve(LOCAL_HA_CONFIG_JSON_FILE).toFile(); + try { + if (localHaConfigFile.exists() && localHaConfigFile.isFile()) { + HighAvailabilityConfig config = + Json.mapper().readValue(localHaConfigFile, HighAvailabilityConfig.class); + return Optional.of(config); } + } catch (Exception e) { + log.warn("Failed to read local HA config from file {}", localHaConfigFile, e); + } + return Optional.empty(); + } - return Optional.of(i); + // Delete the local HA config file. + boolean deleteLocalHighAvailabilityConfig() { + Path localHaConfigPath = getReplicationDirFor("localhost").resolve(LOCAL_HA_CONFIG_JSON_FILE); + try { + return Files.deleteIfExists(localHaConfigPath); + } catch (IOException e) { + log.warn("Fail to delete the local HA config file {}", localHaConfigPath, e); } + return false; + } - return Optional.empty(); + void cleanupReceivedBackups(URL leader, int numToRetain) { + List backups = this.listBackups(leader); + this.cleanupBackups(backups, numToRetain); } public synchronized ShellResponse runCommand(T params) { List commandArgs = params.getCommandArgs(); Map extraVars = params.getExtraVars(); - LOG.debug("Command to run: [" + String.join(" ", commandArgs) + "]"); + log.debug("Command to run: [" + String.join(" ", commandArgs) + "]"); boolean logCmdOutput = isBackupScriptOutputEnabled(); return shellProcessHandler.run(commandArgs, extraVars, logCmdOutput); diff --git a/managed/src/main/java/com/yugabyte/yw/common/ha/PlatformReplicationManager.java b/managed/src/main/java/com/yugabyte/yw/common/ha/PlatformReplicationManager.java index f4ef8708db79..a50dc11c2d09 100644 --- a/managed/src/main/java/com/yugabyte/yw/common/ha/PlatformReplicationManager.java +++ b/managed/src/main/java/com/yugabyte/yw/common/ha/PlatformReplicationManager.java @@ -18,6 +18,8 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import com.yugabyte.yw.common.AppConfigHelper; +import com.yugabyte.yw.common.ConfigHelper; +import com.yugabyte.yw.common.ConfigHelper.ConfigType; import com.yugabyte.yw.common.PlatformScheduler; import com.yugabyte.yw.common.PlatformServiceException; import com.yugabyte.yw.common.PrometheusConfigHelper; @@ -26,6 +28,8 @@ import com.yugabyte.yw.common.utils.FileUtils; import com.yugabyte.yw.models.HighAvailabilityConfig; import com.yugabyte.yw.models.PlatformInstance; +import io.ebean.DB; +import io.ebean.annotation.Transactional; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Gauge; import java.io.File; @@ -36,12 +40,14 @@ import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -68,6 +74,8 @@ public class PlatformReplicationManager { private final PrometheusConfigHelper prometheusConfigHelper; + private final ConfigHelper configHelper; + private static final String INSTANCE_ADDRESS_LABEL = "instance_address"; public static final Gauge HA_LAST_BACKUP_TIME = @@ -75,8 +83,6 @@ public class PlatformReplicationManager { .labelNames(INSTANCE_ADDRESS_LABEL) .register(CollectorRegistry.defaultRegistry); - private static final String BACKUP_SIZE_LABEL = "last_backup_size"; - public static final Gauge HA_LAST_BACKUP_SIZE = Gauge.build("yba_ha_last_backup_size_mb", "Last backup size for remote instances") .register(CollectorRegistry.defaultRegistry); @@ -86,19 +92,21 @@ public PlatformReplicationManager( PlatformScheduler platformScheduler, PlatformReplicationHelper replicationHelper, FileDataService fileDataService, - PrometheusConfigHelper prometheusConfigHelper) { + PrometheusConfigHelper prometheusConfigHelper, + ConfigHelper configHelper) { this.platformScheduler = platformScheduler; this.replicationHelper = replicationHelper; this.fileDataService = fileDataService; this.prometheusConfigHelper = prometheusConfigHelper; - this.schedule = new AtomicReference<>(null); + this.configHelper = configHelper; + this.schedule = new AtomicReference<>(); } private Cancellable getSchedule() { return this.schedule.get(); } - public void start() { + public synchronized void start() { if (replicationHelper.isBackupScheduleRunning(this.getSchedule())) { log.warn("Platform backup schedule is already started"); return; @@ -116,7 +124,7 @@ public void start() { } } - public void stop() { + public synchronized void stop() { if (!replicationHelper.isBackupScheduleRunning(this.getSchedule())) { log.debug("Platform backup schedule is already stopped"); return; @@ -125,6 +133,7 @@ public void stop() { if (!this.getSchedule().cancel()) { log.warn("Unknown error occurred stopping platform backup schedule"); } + deleteLocalHighAvailabilityConfig(); } public void init() { @@ -179,63 +188,172 @@ public JsonNode getBackupInfo() { replicationHelper.isBackupScheduleRunning(this.getSchedule())); } - public void demoteLocalInstance(PlatformInstance localInstance, String leaderAddr) + /** Validates that the request coming from the remote host to change the leader is not stale. */ + public void validateSwitchLeaderRequestForStaleness( + HighAvailabilityConfig config, String requestLeaderAddr, Date requestLastFailover) { + Date localLastFailover = config.getLastFailover(); + if (localLastFailover == null) { + log.debug("No failover has happened because last failover timestamp is not set"); + return; + } + log.debug( + "Local last failover time='{}' and request failover time='{}' from remote host='{}'", + localLastFailover, + requestLastFailover, + requestLeaderAddr); + int result = localLastFailover.compareTo(requestLastFailover); + if (result < 0) { + return; + } + if (result == 0) { + // Ensure the request originates from the same leader if the timestamp is the same. + PlatformInstance localLeader = config.getLeader().orElse(null); + if (localLeader == null || localLeader.getAddress().equals(requestLeaderAddr)) { + return; + } + } + log.error( + "Rejecting request time={} from {} due to more recent last failover time={}", + requestLastFailover, + requestLeaderAddr, + localLastFailover); + throw new PlatformServiceException( + BAD_REQUEST, "Cannot accept request from stale leader " + requestLeaderAddr); + } + + /** Demote the local instance that is invoked by a remote peer. */ + @Transactional + public synchronized void demoteLocalInstance( + HighAvailabilityConfig config, + PlatformInstance localInstance, + String requestLeaderAddr, + Date requestLastFailover) throws MalformedURLException { - log.info("Demoting local instance."); + log.info( + "Demoting local instance {} in favor of leader {}", + localInstance.getAddress(), + requestLeaderAddr); if (!localInstance.getIsLocal()) { throw new RuntimeException("Cannot perform this action on a remote instance"); } + validateSwitchLeaderRequestForStaleness(config, requestLeaderAddr, requestLastFailover); + + if (localInstance.getAddress().equals(requestLeaderAddr)) { + log.warn("Detected partial promotion failure after backup restoration"); + if (!updateLocalInstanceAfterRestore(config)) { + throw new RuntimeException( + String.format( + "Remote address %s is same as the local address %s. It cannot be fixed", + localInstance.getAddress(), requestLeaderAddr)); + } + localInstance = config.getLocal().get(); + } + + config.updateLastFailover(requestLastFailover); // Stop the old backup schedule. - this.stopAndDisable(); + stopAndDisable(); // Demote the local instance to follower. localInstance.demote(); + // Set the leader locally. + PlatformInstance.getByAddress(requestLeaderAddr) + .ifPresent( + i -> { + i.setIsLeader(true); + i.update(); + }); + // Try switching local prometheus to read from the reported leader. - replicationHelper.switchPrometheusToFederated(new URL(leaderAddr)); + replicationHelper.switchPrometheusToFederated(new URL(requestLeaderAddr)); + + String version = + configHelper + .getConfig(ConfigType.YugawareMetadata) + .getOrDefault("version", "UNKNOWN") + .toString(); + localInstance.setYbaVersion(version); + localInstance.update(); } - public void promoteLocalInstance(PlatformInstance newLeader) { - log.info("Promoting local instance to active."); - HighAvailabilityConfig config = newLeader.getConfig(); - Optional previousLocal = config.getLocal(); + @VisibleForTesting + public boolean updateLocalInstanceAfterRestore(HighAvailabilityConfig config) { + AtomicBoolean updated = new AtomicBoolean(); + Optional localConfig = maybeGetLocalHighAvailabilityConfig(); + PlatformInstance localInstance = + localConfig.isPresent() ? localConfig.get().getLocal().orElse(null) : null; + if (localInstance != null) { + config.getInstances().stream() + .sorted(Comparator.comparing(PlatformInstance::getIsLocal).reversed()) + .forEach( + i -> { + log.debug( + "Updating instance {}(uuid={}, isLocal={}, isLeader={})", + i.getAddress(), + i.getUuid(), + i.getIsLocal(), + i.getIsLeader()); + boolean isLocal = i.getAddress().equals(localInstance.getAddress()); + i.updateIsLocal(isLocal); + if (isLocal) { + updated.set(isLocal); + } + try { + // Clear out any old backups. + log.info("Cleaning up received backups."); + replicationHelper.cleanupReceivedBackups(new URL(i.getAddress()), 0); + } catch (MalformedURLException ignored) { + } + }); + } + return updated.get(); + } - if (!previousLocal.isPresent()) { + public synchronized void promoteLocalInstance(PlatformInstance newLeader) { + log.info("Promoting local instance {} to active.", newLeader.getAddress()); + HighAvailabilityConfig config = newLeader.getConfig(); + // Update is_local after the backup is restored. + if (!config.getLocal().isPresent() || !updateLocalInstanceAfterRestore(config)) { + // It must update a local instance. throw new RuntimeException("No local instance associated with backup being restored"); } - - // Update which instance should be local. - previousLocal.get().updateIsLocal(false); - config - .getInstances() - .forEach( - i -> { - i.updateIsLocal(i.getUuid().equals(newLeader.getUuid())); - try { - // Clear out any old backups. - log.info("Cleaning up received backups."); - replicationHelper.cleanupReceivedBackups(new URL(i.getAddress()), 0); - } catch (MalformedURLException ignored) { - } - }); - - // Mark the failover timestamp. - config.updateLastFailover(); + // Promote the new local leader first because the remote demotion response is ignored for + // eventual consistency. Otherwise, all of them be in standby if local promotion is done later. + persistLocalInstancePromotion(config, newLeader); // Attempt to ensure all remote instances are in follower state. // Remotely demote any instance reporting to be a leader. config .getRemoteInstances() .forEach( instance -> { - log.info("Demoting remote instance {}", instance.getAddress()); - replicationHelper.demoteRemoteInstance(instance, true); + log.info( + "Demoting remote instance {} in favor of {}", + instance.getAddress(), + newLeader.getAddress()); + // As the error is not propagated, there can be split brain due to communication + // failure that will be fixed ultimately when the communication is restored due to + // background sync. + if (!replicationHelper.demoteRemoteInstance(instance, true)) { + log.warn("Could not demote remote instance {}", instance.getAddress()); + } }); - // Promote the new local leader. - // we need to refresh because i.setIsLocalAndUpdate updated the underlying db bypassing - // newLeader bean. - newLeader.refresh(); - newLeader.promote(); + } + + @Transactional + private synchronized void persistLocalInstancePromotion( + HighAvailabilityConfig config, PlatformInstance localInstance) { + // Mark the failover timestamp. + config.updateLastFailover(); + // Only one leader can be at a time. Demote the remote record first. + config.getRemoteInstances().forEach(PlatformInstance::demote); + localInstance.refresh(); + localInstance.promote(); + // Start the new backup schedule. + start(); + // Finally, switch the prometheus configuration to read from swamper targets directly. + switchPrometheusToStandalone(); + oneOffSync(); } /** @@ -246,8 +364,11 @@ public void promoteLocalInstance(PlatformInstance newLeader) { * @param config the local HA Config model * @param newInstances the JSON payload received from the leader instance */ - public Set importPlatformInstances( - HighAvailabilityConfig config, List newInstances) { + @Transactional + public synchronized Set importPlatformInstances( + HighAvailabilityConfig config, + List newInstances, + Date requestLastFailover) { String localAddress = config.getLocal().get().getAddress(); // Get list of request payload addresses. @@ -261,7 +382,21 @@ public Set importPlatformInstances( "Current instance (%s) not found in Sync request %s", localAddress, newAddrs)); } + // Get the leader instance. It must be present as the leader sends the request. + PlatformInstance leaderInstance = + newInstances.stream() + .filter(PlatformInstance::getIsLeader) + .findFirst() + .orElseThrow( + () -> + new PlatformServiceException( + BAD_REQUEST, "Leader must be included by the sender")); + + validateSwitchLeaderRequestForStaleness( + config, leaderInstance.getAddress(), requestLastFailover); + List existingInstances = config.getInstances(); + // Get list of existing addresses. Set existingAddrs = existingInstances.stream().map(PlatformInstance::getAddress).collect(Collectors.toSet()); @@ -274,19 +409,49 @@ public Set importPlatformInstances( // Import the new instances, or update existing ones. return newInstances.stream() - .map(replicationHelper::processImportedInstance) + .map(this::processImportedInstance) .filter(Optional::isPresent) .map(Optional::get) .collect(Collectors.toSet()); } + @Transactional + private synchronized Optional processImportedInstance(PlatformInstance i) { + Optional config = HighAvailabilityConfig.get(); + if (config.isPresent()) { + // Ensure the previous leader is marked as a follower to avoid uniqueness violation. + if (i.getIsLeader()) { + Optional existingLeader = config.get().getLeader(); + if (existingLeader.isPresent() + && !existingLeader.get().getAddress().equals(i.getAddress())) { + existingLeader.get().demote(); + } + } + Optional existingInstance = PlatformInstance.getByAddress(i.getAddress()); + if (existingInstance.isPresent()) { + // Since we sync instances after sending backups, the leader instance has the source of + // truth as to when the last backup has been successfully sent to followers. + existingInstance.get().setLastBackup(i.getLastBackup()); + existingInstance.get().setIsLeader(i.getIsLeader()); + existingInstance.get().update(); + i = existingInstance.get(); + } else { + i.setIsLocal(false); + i.setConfig(config.get()); + i.save(); + } + return Optional.of(i); + } + return Optional.empty(); + } + public boolean testConnection( HighAvailabilityConfig config, String address, boolean acceptAnyCertificate) { boolean result = replicationHelper.testConnection( config, config.getClusterKey(), address, acceptAnyCertificate); if (!result) { - log.error("Error testing connection to " + address); + log.error("Error testing connection to {}", address); } return result; } @@ -306,7 +471,7 @@ public boolean sendBackup(PlatformInstance remoteInstance) { }) .orElse(false); if (!result) { - log.error("Error sending platform backup to " + remoteInstance.getAddress()); + log.error("Error sending platform backup to {}", remoteInstance.getAddress()); // Clear version mismatch metric replicationHelper.clearMetrics(config, remoteInstance.getAddress()); } @@ -320,68 +485,87 @@ public void oneOffSync() { } } + private boolean precheckSyncCondition(HighAvailabilityConfig config) { + Optional localInstance = config.getLocal(); + if (!localInstance.isPresent()) { + log.error(NO_LOCAL_INSTANCE_MSG); + return false; + } + // No point in taking a backup if there is no one to send it to. + if (config.getRemoteInstances().isEmpty()) { + log.debug("Skipping HA cluster sync..."); + return false; + } + Optional leader = config.getLeader(); + if (!leader.isPresent()) { + log.warn("No leader is found"); + return false; + } + if (!leader.get().getUuid().equals(localInstance.get().getUuid())) { + log.debug("Skipping sync because the local instance is not the leader"); + return false; + } + return true; + } + private synchronized void sync() { try { HighAvailabilityConfig.get() .ifPresent( config -> { try { - if (!config.getLocal().isPresent()) { - log.error(NO_LOCAL_INSTANCE_MSG); + if (!precheckSyncCondition(config)) { return; } - + Optional localInstance = config.getLocal(); List remoteInstances = config.getRemoteInstances(); - // No point in taking a backup if there is no one to send it to. - if (remoteInstances.isEmpty()) { - log.debug("Skipping HA cluster sync..."); - - return; - } - - // Create the platform backup. - if (!this.createBackup()) { - log.error("Error creating platform backup"); - - return; + AtomicBoolean backupCreated = new AtomicBoolean(); + try { + // Create the platform backup. + if (createBackup()) { + backupCreated.set(true); + // Update local last backup time since creating the backup succeeded. + localInstance.get().updateLastBackup(); + } else { + log.error("Error creating platform backup"); + } + } catch (Exception e) { + log.error("Error creating platform backup", e); } - - config - .getLocal() - .ifPresent( - localInstance -> { - // Update local last backup time since creating the backup succeeded. - localInstance.updateLastBackup(); - remoteInstances.forEach( - instance -> { - try { - Date lastLastBackup = instance.getLastBackup(); - instance.updateLastBackup(localInstance.getLastBackup()); - if (!sendBackup(instance)) { - instance.updateLastBackup(lastLastBackup); - } - } catch (Exception e) { - log.error( - "Exception {} sending backup to instance {}", - e.getMessage(), - instance.getAddress()); - } - try { - if (!replicationHelper.syncToRemoteInstance(instance)) { - replicationHelper.clearMetrics(config, instance.getAddress()); - log.error( - "Error syncing config to remote instance {}", - instance.getAddress()); - } - } catch (Exception e) { - log.error( - "Exception {} syncing config to remote instance {}", - e.getMessage(), - instance.getAddress()); - } - }); - }); - // Export metric on last backup + remoteInstances.forEach( + instance -> { + try { + // Sync first before taking the backup to propagate the config faster. + // TODO Put this on a different schedule to sync faster? + if (replicationHelper.syncToRemoteInstance(instance)) { + if (backupCreated.get()) { + try { + Date lastLastBackup = instance.getLastBackup(); + instance.updateLastBackup(localInstance.get().getLastBackup()); + if (!sendBackup(instance)) { + instance.updateLastBackup(lastLastBackup); + } + } catch (Exception e) { + log.error( + "Exception {} sending backup to instance {}", + e.getMessage(), + instance.getAddress()); + } + } + } else { + replicationHelper.clearMetrics(config, instance.getAddress()); + log.error( + "Error syncing config to remote instance {}", + instance.getAddress()); + } + } catch (Exception e) { + log.error( + "Exception {} syncing config to remote instance {}", + e.getMessage(), + instance.getAddress()); + } + }); + // Export metric on last backup. remoteInstances.stream() .forEach( instance -> { @@ -591,7 +775,7 @@ boolean createBackup() { ShellResponse response = replicationHelper.runCommand(new CreatePlatformBackupParams()); if (response.code != 0) { - log.error("Backup failed: " + response.message); + log.error("Backup failed: {}", response.message); } return response.code == 0; @@ -605,15 +789,44 @@ boolean createBackup() { */ public boolean restoreBackup(File input) { log.info("Restoring platform backup..."); - ShellResponse response = replicationHelper.runCommand(new RestorePlatformBackupParams(input)); if (response.code != 0) { - log.error("Restore failed: " + response.message); + log.error("Restore failed: {}", response.message); } else { + log.info("Platform backup restored successfully"); + DB.cacheManager().clearAll(); // Sync the files stored in DB to FS in case restore is successful. fileDataService.syncFileData(AppConfigHelper.getStoragePath(), true); } return response.code == 0; } + + /** + * Save the HA config to a local JSON file generally before a restore of the DB. + * + * @param config the current local HA config. + * @return the path to the file. + */ + public synchronized Path saveLocalHighAvailabilityConfig(HighAvailabilityConfig config) { + return replicationHelper.saveLocalHighAvailabilityConfig(config); + } + + /** + * Reads the HA config from the local JSON file that was saved earlier. + * + * @return the HA config. + */ + public synchronized Optional maybeGetLocalHighAvailabilityConfig() { + return replicationHelper.maybeGetLocalHighAvailabilityConfig(); + } + + /** + * Deletes the local HA config file that was saved earlier. + * + * @return true if deleted, else false. + */ + public synchronized boolean deleteLocalHighAvailabilityConfig() { + return replicationHelper.deleteLocalHighAvailabilityConfig(); + } } diff --git a/managed/src/main/java/com/yugabyte/yw/common/utils/FileUtils.java b/managed/src/main/java/com/yugabyte/yw/common/utils/FileUtils.java index d1a59d28bc31..e723069cf0cc 100644 --- a/managed/src/main/java/com/yugabyte/yw/common/utils/FileUtils.java +++ b/managed/src/main/java/com/yugabyte/yw/common/utils/FileUtils.java @@ -14,11 +14,13 @@ import java.nio.charset.StandardCharsets; import java.nio.file.DirectoryStream; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -94,7 +96,10 @@ public static List listFiles(Path backupDir, String pattern) throws IOExce .map(Path::toFile) .sorted(File::compareTo) .collect(Collectors.toList()); + } catch (NoSuchFileException e) { + // Ignore error if the path does not exist. } + return Collections.emptyList(); } public static void moveFile(Path source, Path destination) throws IOException { diff --git a/managed/src/main/java/com/yugabyte/yw/controllers/HAController.java b/managed/src/main/java/com/yugabyte/yw/controllers/HAController.java index be48880ccc23..60ff58acbdf3 100644 --- a/managed/src/main/java/com/yugabyte/yw/controllers/HAController.java +++ b/managed/src/main/java/com/yugabyte/yw/controllers/HAController.java @@ -12,7 +12,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.inject.Inject; -import com.yugabyte.yw.common.ApiResponse; +import com.yugabyte.yw.common.PlatformServiceException; import com.yugabyte.yw.common.Util; import com.yugabyte.yw.common.ha.PlatformReplicationManager; import com.yugabyte.yw.common.rbac.PermissionInfo.Action; @@ -35,7 +35,6 @@ import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import play.data.Form; import play.libs.Json; import play.mvc.Http; import play.mvc.Result; @@ -47,7 +46,6 @@ public class HAController extends AuthenticatedController { @Inject private PlatformReplicationManager replicationManager; - // TODO: (Daniel) - This could be a task @AuthzPath({ @RequiredPermissionOnResource( requiredPermission = @@ -56,19 +54,16 @@ public class HAController extends AuthenticatedController { action = Action.SUPER_ADMIN_ACTIONS), resourceLocation = @Resource(path = Util.CUSTOMERS, sourceType = SourceType.ENDPOINT)) }) - public Result createHAConfig(Http.Request request) { + public synchronized Result createHAConfig(Http.Request request) { try { - Form formData = - formFactory.getFormDataOrBadRequest(request, HAConfigFormData.class); + HAConfigFormData formData = parseJsonAndValidate(request, HAConfigFormData.class); if (HighAvailabilityConfig.get().isPresent()) { LOG.error("An HA Config already exists"); - - return ApiResponse.error(BAD_REQUEST, "An HA Config already exists"); + throw new PlatformServiceException(BAD_REQUEST, "An HA Config already exists"); } HighAvailabilityConfig config = - HighAvailabilityConfig.create( - formData.get().cluster_key, formData.get().accept_any_certificate); + HighAvailabilityConfig.create(formData.cluster_key, formData.accept_any_certificate); auditService() .createAuditEntryWithReqBody( request, @@ -78,8 +73,10 @@ public Result createHAConfig(Http.Request request) { return PlatformResults.withData(config); } catch (Exception e) { LOG.error("Error creating HA config", e); - - return ApiResponse.error(INTERNAL_SERVER_ERROR, "Error creating HA config"); + if (e instanceof PlatformServiceException) { + throw (PlatformServiceException) e; + } + throw new PlatformServiceException(INTERNAL_SERVER_ERROR, "Error creating HA config"); } } @@ -110,8 +107,10 @@ public Result getHAConfig() { return PlatformResults.withData(resp); } catch (Exception e) { LOG.error("Error retrieving HA config", e); - - return ApiResponse.error(INTERNAL_SERVER_ERROR, "Error retrieving HA config"); + if (e instanceof PlatformServiceException) { + throw (PlatformServiceException) e; + } + throw new PlatformServiceException(INTERNAL_SERVER_ERROR, "Error retrieving HA config"); } } @@ -125,21 +124,20 @@ public Result getHAConfig() { }) public Result editHAConfig(UUID configUUID, Http.Request request) { try { - Optional config = HighAvailabilityConfig.get(configUUID); + Optional config = HighAvailabilityConfig.maybeGet(configUUID); if (!config.isPresent()) { - return ApiResponse.error(NOT_FOUND, "Invalid config UUID"); + throw new PlatformServiceException(NOT_FOUND, "Invalid config UUID"); } - Form formData = - formFactory.getFormDataOrBadRequest(request, HAConfigFormData.class); + HAConfigFormData formData = parseJsonAndValidate(request, HAConfigFormData.class); // Validate when changing from true to false - if (!formData.get().accept_any_certificate && config.get().getAcceptAnyCertificate()) { + if (!formData.accept_any_certificate && config.get().getAcceptAnyCertificate()) { List remoteInstances = config.get().getRemoteInstances(); for (PlatformInstance follower : remoteInstances) { if (!replicationManager.testConnection( config.get(), follower.getAddress(), false /* acceptAnyCertificate */)) { - return ApiResponse.error( + throw new PlatformServiceException( INTERNAL_SERVER_ERROR, "Error testing certificate connection to remote instance " + follower.getAddress()); } @@ -147,7 +145,7 @@ public Result editHAConfig(UUID configUUID, Http.Request request) { } replicationManager.stop(); HighAvailabilityConfig.update( - config.get(), formData.get().cluster_key, formData.get().accept_any_certificate); + config.get(), formData.cluster_key, formData.accept_any_certificate); replicationManager.start(); auditService() .createAuditEntryWithReqBody( @@ -155,8 +153,10 @@ public Result editHAConfig(UUID configUUID, Http.Request request) { return PlatformResults.withData(config); } catch (Exception e) { LOG.error("Error updating cluster key", e); - - return ApiResponse.error(INTERNAL_SERVER_ERROR, "Error updating cluster key"); + if (e instanceof PlatformServiceException) { + throw (PlatformServiceException) e; + } + throw new PlatformServiceException(INTERNAL_SERVER_ERROR, "Error updating cluster key"); } } @@ -171,9 +171,9 @@ public Result editHAConfig(UUID configUUID, Http.Request request) { }) public Result deleteHAConfig(UUID configUUID, Http.Request request) { try { - Optional config = HighAvailabilityConfig.get(configUUID); + Optional config = HighAvailabilityConfig.maybeGet(configUUID); if (!config.isPresent()) { - return ApiResponse.error(NOT_FOUND, "Invalid config UUID"); + throw new PlatformServiceException(NOT_FOUND, "Invalid config UUID"); } Optional localInstance = config.get().getLocal(); @@ -191,8 +191,10 @@ public Result deleteHAConfig(UUID configUUID, Http.Request request) { return ok(); } catch (Exception e) { LOG.error("Error deleting HA config", e); - - return ApiResponse.error(INTERNAL_SERVER_ERROR, "Error deleting HA config"); + if (e instanceof PlatformServiceException) { + throw (PlatformServiceException) e; + } + throw new PlatformServiceException(INTERNAL_SERVER_ERROR, "Error deleting HA config"); } } @@ -210,8 +212,10 @@ public Result generateClusterKey() { return ok(Json.newObject().put("cluster_key", clusterKey)); } catch (Exception e) { LOG.error("Error generating cluster key", e); - - return ApiResponse.error(INTERNAL_SERVER_ERROR, "Error generating cluster key"); + if (e instanceof PlatformServiceException) { + throw (PlatformServiceException) e; + } + throw new PlatformServiceException(INTERNAL_SERVER_ERROR, "Error generating cluster key"); } } } diff --git a/managed/src/main/java/com/yugabyte/yw/controllers/InternalHAController.java b/managed/src/main/java/com/yugabyte/yw/controllers/InternalHAController.java index 0e1866e5ca15..9f9276ead0bf 100644 --- a/managed/src/main/java/com/yugabyte/yw/controllers/InternalHAController.java +++ b/managed/src/main/java/com/yugabyte/yw/controllers/InternalHAController.java @@ -11,9 +11,8 @@ package com.yugabyte.yw.controllers; import com.google.inject.Inject; -import com.yugabyte.yw.common.ApiResponse; import com.yugabyte.yw.common.ConfigHelper; -import com.yugabyte.yw.common.ConfigHelper.ConfigType; +import com.yugabyte.yw.common.PlatformServiceException; import com.yugabyte.yw.common.Util; import com.yugabyte.yw.common.ValidatingFormFactory; import com.yugabyte.yw.common.config.GlobalConfKeys; @@ -30,9 +29,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FilenameUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import play.libs.Files; import play.libs.Files.TemporaryFile; import play.mvc.Controller; @@ -41,24 +39,22 @@ import play.mvc.With; @With(HAAuthenticator.class) +@Slf4j public class InternalHAController extends Controller { - public static final Logger LOG = LoggerFactory.getLogger(InternalHAController.class); - - @Inject private RuntimeConfGetter runtimeConfGetter; - + private final RuntimeConfGetter runtimeConfGetter; private final PlatformReplicationManager replicationManager; private final ValidatingFormFactory formFactory; - private final ConfigHelper configHelper; @Inject InternalHAController( + RuntimeConfGetter runtimeConfGetter, PlatformReplicationManager replicationManager, ValidatingFormFactory formFactory, ConfigHelper configHelper) { + this.runtimeConfGetter = runtimeConfGetter; this.replicationManager = replicationManager; this.formFactory = formFactory; - this.configHelper = configHelper; } private String getClusterKey(Http.Request request) { @@ -71,59 +67,48 @@ public Result getHAConfigByClusterKey(Http.Request request) { HighAvailabilityConfig.getByClusterKey(this.getClusterKey(request)); if (!config.isPresent()) { - return ApiResponse.error(NOT_FOUND, "Could not find HA Config by cluster key"); + throw new PlatformServiceException(NOT_FOUND, "Could not find HA Config by cluster key"); } return PlatformResults.withData(config.get()); } catch (Exception e) { - LOG.error("Error retrieving HA config"); - - return ApiResponse.error(INTERNAL_SERVER_ERROR, "Error retrieving HA config"); + log.error("Error retrieving HA config"); + if (e instanceof PlatformServiceException) { + throw (PlatformServiceException) e; + } + throw new PlatformServiceException(INTERNAL_SERVER_ERROR, "Error retrieving HA config"); } } // TODO: Change this to accept ObjectNode instead of ArrayNode in request body - public Result syncInstances(long timestamp, Http.Request request) { + public synchronized Result syncInstances(long timestamp, Http.Request request) { + log.debug("Received request to sync instances from {}", request.remoteAddress()); Optional config = HighAvailabilityConfig.getByClusterKey(this.getClusterKey(request)); if (!config.isPresent()) { - return ApiResponse.error(NOT_FOUND, "Invalid config UUID"); + throw new PlatformServiceException(NOT_FOUND, "Invalid config UUID"); } Optional localInstance = config.get().getLocal(); if (!localInstance.isPresent()) { - LOG.warn("No local instance configured"); - - return ApiResponse.error(BAD_REQUEST, "No local instance configured"); + log.warn("No local instance configured"); + throw new PlatformServiceException(BAD_REQUEST, "No local instance configured"); } - if (localInstance.get().getIsLeader()) { - LOG.warn( + log.warn( "Rejecting request to import instances due to this process being designated a leader"); - - return ApiResponse.error(BAD_REQUEST, "Cannot import instances for a leader"); + throw new PlatformServiceException(BAD_REQUEST, "Cannot import instances for a leader"); } - - Date requestLastFailover = new Date(timestamp); - Date localLastFailover = config.get().getLastFailover(); - - // Reject the request if coming from a platform instance that was failed over to earlier. - if (localLastFailover != null && localLastFailover.after(requestLastFailover)) { - LOG.warn("Rejecting request to import instances due to request lastFailover being stale"); - - return ApiResponse.error(BAD_REQUEST, "Cannot import instances from stale leader"); - } - String content = request.body().asBytes().utf8String(); List newInstances = Util.parseJsonArray(content, PlatformInstance.class); Set processedInstances = - replicationManager.importPlatformInstances(config.get(), newInstances); + replicationManager.importPlatformInstances(config.get(), newInstances, new Date(timestamp)); return PlatformResults.withData(processedInstances); } - public Result syncBackups(Http.Request request) throws Exception { + public synchronized Result syncBackups(Http.Request request) throws Exception { Http.MultipartFormData body = request.body().asMultipartFormData(); Map reqParams = body.asFormUrlEncoded(); @@ -135,7 +120,7 @@ public Result syncBackups(Http.Request request) throws Exception { && senders.length == 1 && ybaVersions.length == 1) || (reqParams.size() == 2 && leaders.length == 1 && senders.length == 1))) { - return ApiResponse.error( + throw new PlatformServiceException( BAD_REQUEST, "Expected exactly 2 (leader, sender) or 3 (leader, sender, ybaversion) arguments in " + "'application/x-www-form-urlencoded' data part. Received: " @@ -143,7 +128,7 @@ public Result syncBackups(Http.Request request) throws Exception { } Http.MultipartFormData.FilePart filePart = body.getFile("backup"); if (filePart == null) { - return ApiResponse.error(BAD_REQUEST, "backup file not found in request"); + throw new PlatformServiceException(BAD_REQUEST, "backup file not found in request"); } String fileName = FilenameUtils.getName(filePart.getFilename()); TemporaryFile temporaryFile = filePart.getRef(); @@ -151,29 +136,29 @@ public Result syncBackups(Http.Request request) throws Exception { String sender = senders[0]; if (!leader.equals(sender)) { - return ApiResponse.error( + throw new PlatformServiceException( BAD_REQUEST, "Sender: " + sender + " does not match leader: " + leader); } Optional config = HighAvailabilityConfig.getByClusterKey(this.getClusterKey(request)); if (!config.isPresent()) { - return ApiResponse.error(BAD_REQUEST, "Could not find HA Config"); + throw new PlatformServiceException(BAD_REQUEST, "Could not find HA Config"); } Optional localInstance = config.get().getLocal(); if (localInstance.isPresent() && leader.equals(localInstance.get().getAddress())) { - return ApiResponse.error( + throw new PlatformServiceException( BAD_REQUEST, "Backup originated on the node itself. Leader: " + leader); } if (ybaVersions.length == 1) { String ybaVersion = ybaVersions[0]; if (Util.compareYbVersions(ybaVersion, Util.getYbaVersion()) > 0) { - return ApiResponse.error( + throw new PlatformServiceException( BAD_REQUEST, String.format( - "Can not sync backup from leader on higher version %s to follower on lower version" + "Cannot sync backup from leader on higher version %s to follower on lower version" + " %s", ybaVersion, Util.getYbaVersion())); } @@ -188,67 +173,42 @@ public Result syncBackups(Http.Request request) throws Exception { // TODO: (Daniel) - Need to cleanup backups in non-current leader dir too. replicationManager.cleanupReceivedBackups(leaderUrl); return YBPSuccess.withMessage("File uploaded"); - } else { - return ApiResponse.error(INTERNAL_SERVER_ERROR, "failed to copy backup"); } + throw new PlatformServiceException(INTERNAL_SERVER_ERROR, "Failed to copy backup"); } - public Result demoteLocalLeader(long timestamp, boolean promote, Http.Request request) { + /** This is invoked by the remote peer to demote this local leader. */ + public synchronized Result demoteLocalLeader( + long timestamp, boolean promote, Http.Request request) { try { - LOG.info("Received request to demote local instance."); + log.debug("Received request to demote local instance from {}", request.remoteAddress()); + DemoteInstanceFormData formData = + formFactory.getFormDataOrBadRequest(request, DemoteInstanceFormData.class).get(); Optional config = - HighAvailabilityConfig.getByClusterKey(this.getClusterKey(request)); + HighAvailabilityConfig.getByClusterKey(getClusterKey(request)); if (!config.isPresent()) { - LOG.warn("No HA configuration configured, skipping request"); - - return ApiResponse.error(NOT_FOUND, "Invalid config UUID"); + log.warn("No HA configuration configured, skipping request"); + throw new PlatformServiceException(NOT_FOUND, "Invalid config UUID"); } - - DemoteInstanceFormData formData = - formFactory.getFormDataOrBadRequest(request, DemoteInstanceFormData.class).get(); - Optional localInstance = config.get().getLocal(); - if (!localInstance.isPresent()) { - LOG.warn("No local instance configured"); - - return ApiResponse.error(BAD_REQUEST, "No local instance configured"); - } - - Date requestLastFailover = new Date(timestamp); - Date localLastFailover = config.get().getLastFailover(); - - // Reject the request if coming from a platform instance that was failed over to earlier. - if (localLastFailover != null && localLastFailover.after(requestLastFailover)) { - LOG.warn("Rejecting demote request due to request lastFailover being stale"); - - return ApiResponse.error(BAD_REQUEST, "Rejecting demote request from stale leader"); - } else if (localLastFailover == null || localLastFailover.before(requestLastFailover)) { - // Otherwise, update the last failover timestamp and proceed with demotion request. - config.get().updateLastFailover(requestLastFailover); + log.warn("No local instance configured"); + throw new PlatformServiceException(BAD_REQUEST, "No local instance configured"); } - // Demote the local instance. - replicationManager.demoteLocalInstance(localInstance.get(), formData.leader_address); - - String version = - configHelper - .getConfig(ConfigType.YugawareMetadata) - .getOrDefault("version", "UNKNOWN") - .toString(); - - localInstance.get().setYbaVersion(version); - + replicationManager.demoteLocalInstance( + config.get(), localInstance.get(), formData.leader_address, new Date(timestamp)); // Only restart YBA when demote comes from promote call, not from periodic sync if (promote && runtimeConfGetter.getGlobalConf(GlobalConfKeys.haShutdownLevel) > 1) { Util.shutdownYbaProcess(5); } - return PlatformResults.withData(localInstance); } catch (Exception e) { - LOG.error("Error demoting platform instance", e); - - return ApiResponse.error(INTERNAL_SERVER_ERROR, "Error demoting platform instance"); + log.error("Error demoting platform instance", e); + if (e instanceof PlatformServiceException) { + throw (PlatformServiceException) e; + } + throw new PlatformServiceException(INTERNAL_SERVER_ERROR, "Error demoting platform instance"); } } } diff --git a/managed/src/main/java/com/yugabyte/yw/controllers/PlatformInstanceController.java b/managed/src/main/java/com/yugabyte/yw/controllers/PlatformInstanceController.java index 73c344fe19c7..9eea1a15d0cb 100644 --- a/managed/src/main/java/com/yugabyte/yw/controllers/PlatformInstanceController.java +++ b/managed/src/main/java/com/yugabyte/yw/controllers/PlatformInstanceController.java @@ -38,7 +38,6 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import play.data.Form; import play.mvc.Http; import play.mvc.Result; @@ -63,39 +62,33 @@ public class PlatformInstanceController extends AuthenticatedController { public Result createInstance(UUID configUUID, Http.Request request) { Optional config = HighAvailabilityConfig.getOrBadRequest(configUUID); - Form formData = - formFactory.getFormDataOrBadRequest(request, PlatformInstanceFormData.class); - + PlatformInstanceFormData formData = + parseJsonAndValidate(request, PlatformInstanceFormData.class); // Cannot create a remote instance before creating a local instance. - if (!formData.get().is_local && !config.get().getLocal().isPresent()) { + if (!formData.is_local && !config.get().getLocal().isPresent()) { throw new PlatformServiceException( BAD_REQUEST, "Cannot create a remote platform instance before creating local platform instance"); // Cannot create a remote instance if local instance is follower. - } else if (!formData.get().is_local && !config.get().isLocalLeader()) { + } else if (!formData.is_local && !config.get().isLocalLeader()) { throw new PlatformServiceException( BAD_REQUEST, "Cannot create a remote platform instance on a follower platform instance"); // Cannot create multiple local platform instances. - } else if (formData.get().is_local && config.get().getLocal().isPresent()) { + } else if (formData.is_local && config.get().getLocal().isPresent()) { throw new PlatformServiceException(BAD_REQUEST, "Local platform instance already exists"); // Cannot create multiple leader platform instances. - } else if (formData.get().is_leader && config.get().isLocalLeader()) { + } else if (formData.is_leader && config.get().isLocalLeader()) { throw new PlatformServiceException(BAD_REQUEST, "Leader platform instance already exists"); - } else if (!formData.get().is_local + } else if (!formData.is_local && !replicationManager.testConnection( - config.get(), - formData.get().getCleanAddress(), - config.get().getAcceptAnyCertificate())) { + config.get(), formData.getCleanAddress(), config.get().getAcceptAnyCertificate())) { throw new PlatformServiceException( BAD_REQUEST, "Standby YBA instance is unreachable or hasn't been configured yet"); } PlatformInstance instance = PlatformInstance.create( - config.get(), - formData.get().getCleanAddress(), - formData.get().is_leader, - formData.get().is_local); + config.get(), formData.getCleanAddress(), formData.is_leader, formData.is_local); // Mark this instance as "failed over to" initially since it is a leader instance. if (instance.getIsLeader()) { @@ -185,7 +178,7 @@ public Result getLocal(UUID configUUID) { action = Action.SUPER_ADMIN_ACTIONS), resourceLocation = @Resource(path = Util.CUSTOMERS, sourceType = SourceType.ENDPOINT)) }) - public Result promoteInstance( + public synchronized Result promoteInstance( UUID configUUID, UUID instanceUUID, String curLeaderAddr, boolean force, Http.Request request) throws java.net.MalformedURLException { Optional config = HighAvailabilityConfig.getOrBadRequest(configUUID); @@ -208,8 +201,8 @@ public Result promoteInstance( throw new PlatformServiceException(BAD_REQUEST, "Cannot promote a leader platform instance"); } - Form formData = - formFactory.getFormDataOrBadRequest(request, RestorePlatformBackupFormData.class); + RestorePlatformBackupFormData formData = + parseJsonAndValidate(request, RestorePlatformBackupFormData.class); if (StringUtils.isBlank(curLeaderAddr)) { Optional leaderInstance = config.get().getLeader(); @@ -232,7 +225,7 @@ public Result promoteInstance( // Make sure the backup file provided exists. Optional backup = replicationManager.listBackups(new URL(curLeaderAddr)).stream() - .filter(f -> f.getName().equals(formData.get().backup_file)) + .filter(f -> f.getName().equals(formData.backup_file)) .findFirst(); if (!backup.isPresent()) { throw new PlatformServiceException(BAD_REQUEST, "Could not find backup file"); @@ -241,8 +234,13 @@ public Result promoteInstance( // Cache local instance address before restore so we can query to new corresponding model. String localInstanceAddr = instance.get().getAddress(); + // Save the local HA config before it is wiped out. + replicationManager.saveLocalHighAvailabilityConfig(config.get()); + // Restore the backup. - backup.ifPresent(replicationManager::restoreBackup); + if (!replicationManager.restoreBackup(backup.get())) { + throw new PlatformServiceException(BAD_REQUEST, "Could not restore backup"); + } // Handle any incomplete tasks that may be leftover from the backup that was restored. taskManager.handleAllPendingTasks(); @@ -251,11 +249,6 @@ public Result promoteInstance( PlatformInstance.getByAddress(localInstanceAddr) .ifPresent(replicationManager::promoteLocalInstance); - // Start the new backup schedule. - replicationManager.start(); - - // Finally, switch the prometheus configuration to read from swamper targets directly. - replicationManager.switchPrometheusToStandalone(); auditService() .createAuditEntry( request, @@ -263,8 +256,6 @@ public Result promoteInstance( instanceUUID.toString(), Audit.ActionType.Promote); - replicationManager.oneOffSync(); - if (runtimeConfGetter.getGlobalConf(GlobalConfKeys.haShutdownLevel) > 0) { Util.shutdownYbaProcess(5); } diff --git a/managed/src/main/java/com/yugabyte/yw/controllers/PlatformReplicationController.java b/managed/src/main/java/com/yugabyte/yw/controllers/PlatformReplicationController.java index a262fc0163dd..ee93bc56035f 100644 --- a/managed/src/main/java/com/yugabyte/yw/controllers/PlatformReplicationController.java +++ b/managed/src/main/java/com/yugabyte/yw/controllers/PlatformReplicationController.java @@ -12,7 +12,7 @@ package com.yugabyte.yw.controllers; import com.google.inject.Inject; -import com.yugabyte.yw.common.ApiResponse; +import com.yugabyte.yw.common.PlatformServiceException; import com.yugabyte.yw.common.Util; import com.yugabyte.yw.common.ha.PlatformReplicationManager; import com.yugabyte.yw.common.rbac.PermissionInfo.Action; @@ -38,7 +38,6 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import play.data.Form; import play.mvc.Http; import play.mvc.Result; @@ -58,19 +57,19 @@ public class PlatformReplicationController extends AuthenticatedController { }) public Result startPeriodicBackup(UUID configUUID, Http.Request request) { try { - Optional config = HighAvailabilityConfig.get(configUUID); + Optional config = HighAvailabilityConfig.maybeGet(configUUID); if (!config.isPresent()) { - return ApiResponse.error(NOT_FOUND, "Invalid config UUID"); + throw new PlatformServiceException(NOT_FOUND, "Invalid config UUID"); } - Form formData = - formFactory.getFormDataOrBadRequest(request, PlatformBackupFrequencyFormData.class); + PlatformBackupFrequencyFormData formData = + parseJsonAndValidate(request, PlatformBackupFrequencyFormData.class); if (!config.get().isLocalLeader()) { - return ApiResponse.error(BAD_REQUEST, "This platform instance is not a leader"); + throw new PlatformServiceException(BAD_REQUEST, "This platform instance is not a leader"); } - Duration frequency = Duration.ofMillis(formData.get().frequency_milliseconds); + Duration frequency = Duration.ofMillis(formData.frequency_milliseconds); // Restart the backup schedule with the new frequency. auditService() @@ -82,8 +81,11 @@ public Result startPeriodicBackup(UUID configUUID, Http.Request request) { return ok(replicationManager.setFrequencyStartAndEnable(frequency)); } catch (Exception e) { LOG.error("Error starting backup schedule", e); - - return ApiResponse.error(INTERNAL_SERVER_ERROR, "Error starting replication schedule"); + if (e instanceof PlatformServiceException) { + throw (PlatformServiceException) e; + } + throw new PlatformServiceException( + INTERNAL_SERVER_ERROR, "Error starting replication schedule"); } } @@ -97,9 +99,9 @@ public Result startPeriodicBackup(UUID configUUID, Http.Request request) { }) public Result stopPeriodicBackup(UUID configUUID, Http.Request request) { try { - Optional config = HighAvailabilityConfig.get(configUUID); + Optional config = HighAvailabilityConfig.maybeGet(configUUID); if (!config.isPresent()) { - return ApiResponse.error(NOT_FOUND, "Invalid config UUID"); + throw new PlatformServiceException(NOT_FOUND, "Invalid config UUID"); } return ok(replicationManager.stopAndDisable()); @@ -111,7 +113,11 @@ public Result stopPeriodicBackup(UUID configUUID, Http.Request request) { Audit.TargetType.HABackup, configUUID.toString(), Audit.ActionType.StopPeriodicBackup); - return ApiResponse.error(INTERNAL_SERVER_ERROR, "Error stopping replication schedule"); + if (e instanceof PlatformServiceException) { + throw (PlatformServiceException) e; + } + throw new PlatformServiceException( + INTERNAL_SERVER_ERROR, "Error stopping replication schedule"); } } @@ -125,16 +131,19 @@ public Result stopPeriodicBackup(UUID configUUID, Http.Request request) { }) public Result getBackupInfo(UUID configUUID) { try { - Optional config = HighAvailabilityConfig.get(configUUID); + Optional config = HighAvailabilityConfig.maybeGet(configUUID); if (!config.isPresent()) { - return ApiResponse.error(NOT_FOUND, "Invalid config UUID"); + throw new PlatformServiceException(NOT_FOUND, "Invalid config UUID"); } return ok(replicationManager.getBackupInfo()); } catch (Exception e) { LOG.error("Error retrieving backup frequency", e); - - return ApiResponse.error(INTERNAL_SERVER_ERROR, "Error retrieving replication frequency"); + if (e instanceof PlatformServiceException) { + throw (PlatformServiceException) e; + } + throw new PlatformServiceException( + INTERNAL_SERVER_ERROR, "Error retrieving replication frequency"); } } @@ -149,14 +158,15 @@ public Result getBackupInfo(UUID configUUID) { public Result listBackups(UUID configUUID, String leaderAddr) { try { if (StringUtils.isBlank(leaderAddr)) { - Optional config = HighAvailabilityConfig.get(configUUID); + Optional config = HighAvailabilityConfig.maybeGet(configUUID); if (!config.isPresent()) { - return ApiResponse.error(NOT_FOUND, "Invalid config UUID"); + throw new PlatformServiceException(NOT_FOUND, "Invalid config UUID"); } Optional leaderInstance = config.get().getLeader(); if (!leaderInstance.isPresent()) { - return ApiResponse.error(BAD_REQUEST, "Could not find leader platform instance"); + throw new PlatformServiceException( + BAD_REQUEST, "Could not find leader platform instance"); } leaderAddr = leaderInstance.get().getAddress(); @@ -170,8 +180,10 @@ public Result listBackups(UUID configUUID, String leaderAddr) { return PlatformResults.withData(backups); } catch (Exception e) { LOG.error("Error listing backups", e); - - return ApiResponse.error(INTERNAL_SERVER_ERROR, "Error listing backups"); + if (e instanceof PlatformServiceException) { + throw (PlatformServiceException) e; + } + throw new PlatformServiceException(INTERNAL_SERVER_ERROR, "Error listing backups"); } } } diff --git a/managed/src/main/java/com/yugabyte/yw/models/HighAvailabilityConfig.java b/managed/src/main/java/com/yugabyte/yw/models/HighAvailabilityConfig.java index 79e66402a5c5..3af0c4299028 100644 --- a/managed/src/main/java/com/yugabyte/yw/models/HighAvailabilityConfig.java +++ b/managed/src/main/java/com/yugabyte/yw/models/HighAvailabilityConfig.java @@ -57,8 +57,6 @@ public class HighAvailabilityConfig extends Model { private static final Finder find = new Finder(HighAvailabilityConfig.class) {}; - private static long BACKUP_DISCONNECT_TIME_MILLIS = 15 * (60 * 1000); - @JsonIgnore private final int id = 1; @Id @@ -153,13 +151,12 @@ public static void update( config.update(); } - @Deprecated - public static Optional get(UUID uuid) { + public static Optional maybeGet(UUID uuid) { return Optional.ofNullable(find.byId(uuid)); } public static Optional getOrBadRequest(UUID uuid) { - Optional config = get(uuid); + Optional config = maybeGet(uuid); if (!config.isPresent()) { throw new PlatformServiceException(BAD_REQUEST, "Invalid config UUID"); } @@ -211,7 +208,7 @@ public GlobalState computeGlobalState() { } else if (this.anyConnected()) { return GlobalState.Operational; } - } else if (this.isFollower()) { + } else if (isFollower()) { if (this.instances.size() == 1) { return GlobalState.AwaitingReplicas; } else if (this.getLocal().isPresent()) { diff --git a/managed/src/main/java/com/yugabyte/yw/models/PlatformInstance.java b/managed/src/main/java/com/yugabyte/yw/models/PlatformInstance.java index 275933fbee46..6f63b7c28672 100644 --- a/managed/src/main/java/com/yugabyte/yw/models/PlatformInstance.java +++ b/managed/src/main/java/com/yugabyte/yw/models/PlatformInstance.java @@ -39,7 +39,6 @@ import jakarta.persistence.TemporalType; import jakarta.persistence.Transient; import java.io.IOException; -import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Date; import java.util.Optional; @@ -62,9 +61,6 @@ public class PlatformInstance extends Model { private static final Logger LOG = LoggerFactory.getLogger(PlatformInstance.class); - private static final SimpleDateFormat TIMESTAMP_FORMAT = - new SimpleDateFormat("EEE MMM dd HH:mm:ss zzz yyyy"); - private static long BACKUP_DISCONNECT_TIME_MILLIS = 15 * (60 * 1000); @Id @@ -258,7 +254,7 @@ public PlatformInstance deserialize(JsonParser jp, DeserializationContext ctxt) PlatformInstance instance = new PlatformInstance(); instance.uuid = UUID.fromString(json.get("uuid").asText()); UUID configUUID = UUID.fromString(json.get("config_uuid").asText()); - instance.config = HighAvailabilityConfig.get(configUUID).orElse(null); + instance.config = HighAvailabilityConfig.maybeGet(configUUID).orElse(null); instance.address = json.get("address").asText(); instance.setIsLeader(json.get("is_leader").asBoolean()); instance.setIsLocal(json.get("is_local").asBoolean()); diff --git a/managed/src/test/java/com/yugabyte/yw/common/ha/PlatformReplicationManagerTest.java b/managed/src/test/java/com/yugabyte/yw/common/ha/PlatformReplicationManagerTest.java index d9f78b829a9a..369ebe226cce 100644 --- a/managed/src/test/java/com/yugabyte/yw/common/ha/PlatformReplicationManagerTest.java +++ b/managed/src/test/java/com/yugabyte/yw/common/ha/PlatformReplicationManagerTest.java @@ -207,7 +207,8 @@ public void testCreatePlatformBackupParams( mockPlatformScheduler, mockReplicationUtil, mockFileDataService, - mockPrometheusConfigHelper); + mockPrometheusConfigHelper, + mockConfigHelper); List expectedCommandArgs = getExpectedPlatformBackupCommandArgs( @@ -259,7 +260,8 @@ public void testGCBackups(int numToRetain) throws Exception { mockPlatformScheduler, mockReplicationUtil, mockFileDataService, - mockPrometheusConfigHelper)); + mockPrometheusConfigHelper, + mockConfigHelper)); List backups = backupManager.listBackups(testUrl); assertEquals(3, backups.size()); diff --git a/managed/src/test/java/com/yugabyte/yw/controllers/HAControllerTest.java b/managed/src/test/java/com/yugabyte/yw/controllers/HAControllerTest.java index 91bef9377e46..f6ea85ae6183 100644 --- a/managed/src/test/java/com/yugabyte/yw/controllers/HAControllerTest.java +++ b/managed/src/test/java/com/yugabyte/yw/controllers/HAControllerTest.java @@ -13,6 +13,7 @@ import static com.yugabyte.yw.common.AssertHelper.assertBadRequest; import static com.yugabyte.yw.common.AssertHelper.assertNotFound; import static com.yugabyte.yw.common.AssertHelper.assertOk; +import static com.yugabyte.yw.common.AssertHelper.assertPlatformException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static play.test.Helpers.contentAsString; @@ -72,8 +73,10 @@ public void testCreateMultipleHAConfigNotAllowed() { assertOk(createResult); clusterKey = Json.parse(contentAsString(createClusterKey())).get("cluster_key").asText(); - body = Json.newObject().put("cluster_key", clusterKey); - Result createResult2 = doRequestWithAuthTokenAndBody("POST", uri, authToken, body); + JsonNode badBody = Json.newObject().put("cluster_key", clusterKey); + Result createResult2 = + assertPlatformException( + () -> doRequestWithAuthTokenAndBody("POST", uri, authToken, badBody)); assertBadRequest(createResult2, "An HA Config already exists"); } diff --git a/managed/src/test/java/com/yugabyte/yw/controllers/InternalHAControllerTest.java b/managed/src/test/java/com/yugabyte/yw/controllers/InternalHAControllerTest.java index 81364ff7f0a6..fc3a2b3a32f7 100644 --- a/managed/src/test/java/com/yugabyte/yw/controllers/InternalHAControllerTest.java +++ b/managed/src/test/java/com/yugabyte/yw/controllers/InternalHAControllerTest.java @@ -173,7 +173,9 @@ public void testSyncInstancesNoLocalInstances() { JsonNode haConfigJson = createHAConfig(); String clusterKey = haConfigJson.get("cluster_key").asText(); String uri = SYNC_ENDPOINT + new Date().getTime(); - Result syncResult = doRequestWithHATokenAndBody("PUT", uri, clusterKey, Json.newObject()); + Result syncResult = + assertPlatformException( + () -> doRequestWithHATokenAndBody("PUT", uri, clusterKey, Json.newObject())); assertBadRequest(syncResult, "No local instance configured"); } @@ -186,7 +188,9 @@ public void testSyncInstancesNoLocalFollower() { haConfigJson = getHAConfig(); config = Json.fromJson(haConfigJson, HighAvailabilityConfig.class); String uri = SYNC_ENDPOINT + config.getLastFailover().getTime(); - Result syncResult = doRequestWithHATokenAndBody("PUT", uri, clusterKey, Json.newObject()); + Result syncResult = + assertPlatformException( + () -> doRequestWithHATokenAndBody("PUT", uri, clusterKey, Json.newObject())); assertBadRequest(syncResult, "Cannot import instances for a leader"); } @@ -426,7 +430,9 @@ public void testSyncBackups_badRequest() throws IOException { String clusterKey = createInstances(haConfigJson, leaderAddr); File fakeDump = createFakeDump(); Result result = - sendBackupSyncRequest(clusterKey, leaderAddr, fakeDump, "http://different.sender"); + assertPlatformException( + () -> + sendBackupSyncRequest(clusterKey, leaderAddr, fakeDump, "http://different.sender")); assertBadRequest( result, "Sender: http://different.sender does not match leader: http://leader.yw.com"); @@ -475,8 +481,9 @@ public void testSyncInstancesFromStaleLeader() { body.add(Json.toJson(i1)); body.add(localInstance); String uri = SYNC_ENDPOINT + 0; - Result syncResult = doRequestWithHATokenAndBody("PUT", uri, clusterKey, body); - assertBadRequest(syncResult, "Cannot import instances from stale leader"); + Result syncResult = + assertPlatformException(() -> doRequestWithHATokenAndBody("PUT", uri, clusterKey, body)); + assertBadRequest(syncResult, "Cannot accept request from stale leader http://abcdef.com"); } @Test @@ -513,8 +520,9 @@ public void testDemoteLocalInstanceStaleFailover() { assertTrue(config.isLocalLeader()); String uri = DEMOTE_ENDPOINT + staleFailover.getTime(); JsonNode body = Json.newObject().put("leader_address", "http://1.2.3.4"); - Result demoteResult = doRequestWithHATokenAndBody("PUT", uri, clusterKey, body); - assertBadRequest(demoteResult, "Rejecting demote request from stale leader"); + Result demoteResult = + assertPlatformException(() -> doRequestWithHATokenAndBody("PUT", uri, clusterKey, body)); + assertBadRequest(demoteResult, "Cannot accept request from stale leader http://1.2.3.4"); haConfigJson = getHAConfig(); config = Json.fromJson(haConfigJson, HighAvailabilityConfig.class); assertTrue(config.isLocalLeader()); diff --git a/managed/src/test/java/com/yugabyte/yw/controllers/PlatformInstanceControllerTest.java b/managed/src/test/java/com/yugabyte/yw/controllers/PlatformInstanceControllerTest.java index 02398ee25e66..39831587fa6a 100644 --- a/managed/src/test/java/com/yugabyte/yw/controllers/PlatformInstanceControllerTest.java +++ b/managed/src/test/java/com/yugabyte/yw/controllers/PlatformInstanceControllerTest.java @@ -39,6 +39,7 @@ import com.yugabyte.yw.models.PlatformInstance; import com.yugabyte.yw.models.Users; import java.time.Duration; +import java.util.Comparator; import java.util.List; import java.util.UUID; import java.util.function.Predicate; @@ -279,7 +280,7 @@ public void testLongAddress() { assertOk(createResult); // Exceed dns length 264 (total address length is 272 > 263) - final String expectedError = "Maximum length is 263"; + final String expectedError = "error.maxLength"; String longAddress = "http://" + StringUtils.repeat("abcdefghi.", 26) + ".com/"; createResult = assertPlatformException(() -> createPlatformInstance(configUUID, longAddress, true, false)); @@ -389,14 +390,26 @@ public void testPromoteLocalInstance() throws InterruptedException { JsonNode haConfigJson = createHAConfig(); HighAvailabilityConfig config = Json.fromJson(haConfigJson, HighAvailabilityConfig.class); UUID configUUID = config.getUuid(); - Result createResult = createPlatformInstance(configUUID, "http://abc.com/", true, true); - assertOk(createResult); - createResult = createPlatformInstance(configUUID, "http://def.com/", false, false); - assertOk(createResult); - JsonNode instanceJson = Json.parse(contentAsString(createResult)); - PlatformInstance instance = Json.fromJson(instanceJson, PlatformInstance.class); - - platformReplicationManager.promoteLocalInstance(instance); + // Local active instance. + Result localCreateResult = createPlatformInstance(configUUID, "http://abc.com/", true, true); + assertOk(localCreateResult); + // Remote, standby instance. + Result remoteCreateResult = createPlatformInstance(configUUID, "http://def.com/", false, false); + assertOk(remoteCreateResult); + PlatformInstance remoteInstance = + Json.fromJson(Json.parse(contentAsString(remoteCreateResult)), PlatformInstance.class); + config.refresh(); + // Clone the config to create a remote HA config. + HighAvailabilityConfig remoteHAConfig = + Json.fromJson(Json.toJson(config), HighAvailabilityConfig.class); + // Change isLocal for the remote node. + remoteHAConfig.getInstances().stream() + .sorted(Comparator.comparing(PlatformInstance::getIsLocal).reversed()) + .forEach(i -> i.updateIsLocal(!i.getIsLocal())); + // Write out the updated remote config. + platformReplicationManager.saveLocalHighAvailabilityConfig(remoteHAConfig); + + platformReplicationManager.promoteLocalInstance(remoteInstance); platformReplicationManager.setFrequencyStartAndEnable(Duration.ofSeconds(1)); diff --git a/managed/src/test/java/com/yugabyte/yw/models/HighAvailabilityConfigTest.java b/managed/src/test/java/com/yugabyte/yw/models/HighAvailabilityConfigTest.java index 3126a528a573..59c41fd06c19 100644 --- a/managed/src/test/java/com/yugabyte/yw/models/HighAvailabilityConfigTest.java +++ b/managed/src/test/java/com/yugabyte/yw/models/HighAvailabilityConfigTest.java @@ -116,7 +116,7 @@ public void testLeaderHaGlobalConfigStateAwaitingReplicas() { assertOk(createResult); createResult = createPlatformInstance(configUUID, "http://def.com/", false, false); assertOk(createResult); - config = HighAvailabilityConfig.get(configUUID).get(); + config = HighAvailabilityConfig.maybeGet(configUUID).get(); assertEquals(GlobalState.AwaitingReplicas, config.computeGlobalState()); } @@ -133,7 +133,7 @@ public void testLeaderHaGlobalConfigStateOperational() { Optional remote = PlatformInstance.get(UUID.fromString(remotePlatformJson.get("uuid").asText())); remote.get().updateLastBackup(new Date()); - config = HighAvailabilityConfig.get(configUUID).get(); + config = HighAvailabilityConfig.maybeGet(configUUID).get(); assertEquals(GlobalState.Operational, config.computeGlobalState()); } @@ -150,7 +150,7 @@ public void testLeaderHaGlobalConfigStateError() { Optional remote = PlatformInstance.get(UUID.fromString(remotePlatformJson.get("uuid").asText())); remote.get().updateLastBackup(new Date(System.currentTimeMillis() - (30 * 60 * 1000))); - config = HighAvailabilityConfig.get(configUUID).get(); + config = HighAvailabilityConfig.maybeGet(configUUID).get(); assertEquals(GlobalState.Error, config.computeGlobalState()); } @@ -175,7 +175,7 @@ public void testLeaderHaGlobalConfigStateWarning() { remote = PlatformInstance.get(UUID.fromString(remotePlatformJson.get("uuid").asText())); remote.get().updateLastBackup(new Date()); - config = HighAvailabilityConfig.get(configUUID).get(); + config = HighAvailabilityConfig.maybeGet(configUUID).get(); assertEquals(GlobalState.Warning, config.computeGlobalState()); } @@ -187,7 +187,7 @@ public void testFollowerHaGlobalConfigStateAwaitingReplicas() { Result createResult = createPlatformInstance(configUUID, "http://abc.com/", true, false); assertOk(createResult); - config = HighAvailabilityConfig.get(configUUID).get(); + config = HighAvailabilityConfig.maybeGet(configUUID).get(); assertEquals(GlobalState.AwaitingReplicas, config.computeGlobalState()); } @@ -211,7 +211,7 @@ public void testFollowerHaGlobalConfigStateOperational() { remote.promote(); local.updateLastBackup(new Date()); - config = HighAvailabilityConfig.get(configUUID).get(); + config = HighAvailabilityConfig.maybeGet(configUUID).get(); assertEquals(GlobalState.StandbyConnected, config.computeGlobalState()); } @@ -235,7 +235,7 @@ public void testFollowerHaGlobalConfigStateError() { remote.promote(); local.updateLastBackup(new Date(System.currentTimeMillis() - (30 * 60 * 1000))); - config = HighAvailabilityConfig.get(configUUID).get(); + config = HighAvailabilityConfig.maybeGet(configUUID).get(); assertEquals(GlobalState.StandbyDisconnected, config.computeGlobalState()); } }