Skip to content

Commit

Permalink
[PLAT-16468] Both instances can go into standby if sync happens from …
Browse files Browse the repository at this point in the history
…the demoted peer right after promotion

Summary: This fixes the original issue + more robustness + cleanup to remove deprecated methods as much as possible.

Test Plan:
1. Modified code to run sync right after demotion. Demotion works fine.
2. Shutdown the active. Make the standby active forcefully. Start the old active YBA. After the next sync in the background,the old YBA transitions to standby.

Will be testing more scenarios.

This diff is deployed on https://10.9.121.30/ and https://10.9.89.26/

Reviewers: muthu, nbhatia, dshubin

Reviewed By: muthu

Subscribers: yugaware

Differential Revision: https://phorge.dev.yugabyte.com/D41266
  • Loading branch information
nkhogen committed Jan 18, 2025
1 parent bd0cdfe commit 1e004c4
Show file tree
Hide file tree
Showing 15 changed files with 598 additions and 374 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.yugabyte.yw.models.Customer;
import com.yugabyte.yw.models.CustomerTask;
import com.yugabyte.yw.models.CustomerTask.TargetType;
import com.yugabyte.yw.models.HighAvailabilityConfig;
import com.yugabyte.yw.models.PendingConsistencyCheck;
import com.yugabyte.yw.models.Restore;
import com.yugabyte.yw.models.RestoreKeyspace;
Expand Down Expand Up @@ -358,7 +359,6 @@ public void handleAllPendingTasks() {
CustomerTask customerTask = CustomerTask.get(row.getLong("customer_task_id"));
handlePendingTask(customerTask, taskInfo);
});

for (Customer customer : Customer.getAll()) {
// Change the DeleteInProgress backups state to QueuedForDeletion
Backup.findAllBackupWithState(
Expand Down Expand Up @@ -460,6 +460,10 @@ private void enableLoadBalancer(Universe universe) {
}

public void handleAutoRetryAbortedTasks() {
if (HighAvailabilityConfig.isFollower()) {
LOG.info("Skipping auto-retry of tasks because this YBA is a follower");
return;
}
Duration timeWindow =
confGetter.getGlobalConf(GlobalConfKeys.autoRetryTasksOnYbaRestartTimeWindow);
if (timeWindow.isZero()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigValue;
import com.typesafe.config.ConfigValueFactory;
import com.yugabyte.yw.common.*;
import com.yugabyte.yw.common.AppConfigHelper;
import com.yugabyte.yw.common.PrometheusConfigHelper;
import com.yugabyte.yw.common.PrometheusConfigManager;
import com.yugabyte.yw.common.ShellProcessHandler;
import com.yugabyte.yw.common.ShellResponse;
import com.yugabyte.yw.common.SwamperHelper;
import com.yugabyte.yw.common.config.GlobalConfKeys;
import com.yugabyte.yw.common.config.RuntimeConfGetter;
import com.yugabyte.yw.common.config.impl.SettableRuntimeConfigFactory;
Expand All @@ -32,6 +37,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
Expand All @@ -41,23 +47,23 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.pekko.actor.Cancellable;
import org.apache.velocity.Template;
import org.apache.velocity.VelocityContext;
import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.RuntimeConstants;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import play.libs.Json;

@Singleton
@Slf4j
public class PlatformReplicationHelper {
private static final Logger LOG = LoggerFactory.getLogger(PlatformReplicationHelper.class);

public static final String BACKUP_DIR = "platformBackups";
public static final String REPLICATION_DIR = "platformReplication";
static final String BACKUP_FILE_PATTERN = "backup_*.tgz";
static final String LOCAL_HA_CONFIG_JSON_FILE = "local_ha_config.json";

// Config keys:
private static final String REPLICATION_SCHEDULE_ENABLED_KEY =
Expand All @@ -78,8 +84,6 @@ public class PlatformReplicationHelper {

private final SettableRuntimeConfigFactory runtimeConfigFactory;

private final ApiHelper apiHelper;

private final PlatformInstanceClientFactory remoteClientFactory;

private final MetricUrlProvider metricUrlProvider;
Expand All @@ -94,15 +98,13 @@ public class PlatformReplicationHelper {
public PlatformReplicationHelper(
RuntimeConfGetter confGetter,
SettableRuntimeConfigFactory runtimeConfigFactory,
ApiHelper apiHelper,
PlatformInstanceClientFactory remoteClientFactory,
ShellProcessHandler shellProcessHandler,
MetricUrlProvider metricUrlProvider,
PrometheusConfigHelper prometheusConfigHelper,
PrometheusConfigManager prometheusConfigManager) {
this.confGetter = confGetter;
this.runtimeConfigFactory = runtimeConfigFactory;
this.apiHelper = apiHelper;
this.remoteClientFactory = remoteClientFactory;
this.shellProcessHandler = shellProcessHandler;
this.metricUrlProvider = metricUrlProvider;
Expand Down Expand Up @@ -238,40 +240,43 @@ private void writeFederatedPrometheusConfig(String remoteAddr, File file, boolea
// Merge the template with the context.
template.merge(context, writer);
} catch (Exception e) {
LOG.error("Error creating federated prometheus config file");
log.error("Error creating federated prometheus config file");
}
}

// This makes calls to the remote instances to demote.
boolean demoteRemoteInstance(PlatformInstance remoteInstance, boolean promote) {
if (remoteInstance.getIsLocal()) {
log.warn("Cannot perform demoteRemoteInstance action on a local instance");
return false;
}
HighAvailabilityConfig config = remoteInstance.getConfig();
try (PlatformInstanceClient client =
this.remoteClientFactory.getClient(
config.getClusterKey(),
remoteInstance.getAddress(),
config.getAcceptAnyCertificateOverrides())) {
if (remoteInstance.getIsLocal()) {
LOG.warn("Cannot perform demoteRemoteInstance action on a local instance");
return false;
if (remoteInstance.getIsLeader()) {
// Ensure all local records for remote instances are set to follower state.
remoteInstance.demote();
}

// Ensure all local records for remote instances are set to follower state.
remoteInstance.demote();

return config
.getLocal()
.map(
localInstance -> {
// Send step down request to remote instance.
log.info(
"Demoting remote instance {} in favor of {}",
remoteInstance.getAddress(),
localInstance.getAddress());
client.demoteInstance(
localInstance.getAddress(), config.getLastFailover().getTime(), promote);

return true;
})
.orElse(false);
} catch (Exception e) {
LOG.error("Error demoting remote platform instance {}", remoteInstance.getAddress(), e);
log.error("Error demoting remote platform instance {}", remoteInstance.getAddress(), e);
}

return false;
}

Expand Down Expand Up @@ -300,27 +305,27 @@ boolean exportPlatformInstances(HighAvailabilityConfig config, String remoteInst
client.syncInstances(config.getLastFailover().getTime(), instancesJson);
return true;
} catch (Exception e) {
LOG.error(
log.error(
"Error exporting local platform instances to remote instance " + remoteInstanceAddr, e);
}
return false;
}

void switchPrometheusToFederated(URL remoteAddr) {
try {
LOG.info("Switching local prometheus to federated or updating it");
log.info("Switching local prometheus to federated or updating it");
File configFile = prometheusConfigHelper.getPrometheusConfigFile();
File configDir = configFile.getParentFile();
File previousConfigFile = new File(configDir, "previous_prometheus.yml");

if (!configDir.exists() && !configDir.mkdirs()) {
LOG.warn("Could not create output dir {}", configDir);
log.warn("Could not create output dir {}", configDir);
return;
}

// Move the old file if it hasn't already been moved.
if (configFile.exists() && !previousConfigFile.exists()) {
LOG.info("Creating previous_prometheus.yml from existing prometheus.yml");
log.info("Creating previous_prometheus.yml from existing prometheus.yml");
FileUtils.moveFile(configFile.toPath(), previousConfigFile.toPath());
}

Expand All @@ -333,18 +338,18 @@ void switchPrometheusToFederated(URL remoteAddr) {
String federatedPoint = remoteAddr.getHost() + ":" + federatedURL.getPort();
boolean https = federatedURL.getScheme().equalsIgnoreCase("https");
this.writeFederatedPrometheusConfig(federatedPoint, configFile, https);
LOG.info("Wrote federated prometheus config.");
log.info("Wrote federated prometheus config.");

// Reload the config.
prometheusConfigHelper.reloadPrometheusConfig();
} catch (Exception e) {
LOG.error("Error switching prometheus config to read from {}", remoteAddr.getHost(), e);
log.error("Error switching prometheus config to read from {}", remoteAddr.getHost(), e);
}
}

void switchPrometheusToStandalone() {
try {
LOG.info("Switching prometheus to standalone.");
log.info("Switching prometheus to standalone.");
File configFile = prometheusConfigHelper.getPrometheusConfigFile();
File configDir = configFile.getParentFile();
File previousConfigFile = new File(configDir, "previous_prometheus.yml");
Expand All @@ -356,9 +361,9 @@ void switchPrometheusToStandalone() {
FileUtils.moveFile(previousConfigFile.toPath(), configFile.toPath());
prometheusConfigHelper.reloadPrometheusConfig();
prometheusConfigManager.updateK8sScrapeConfigs();
LOG.info("Moved previous_prometheus.yml to prometheus.yml");
log.info("Moved previous_prometheus.yml to prometheus.yml");
} catch (Exception e) {
LOG.error("Error switching prometheus config to standalone", e);
log.error("Error switching prometheus config to standalone", e);
}
}

Expand Down Expand Up @@ -439,7 +444,7 @@ void cleanupBackups(List<File> backups, int numToRetain) {
return;
}

LOG.info("Garbage collecting {} backups", numBackups - numToRetain);
log.info("Garbage collecting {} backups", numBackups - numToRetain);
backups.subList(0, numBackups - numToRetain).forEach(File::delete);
}

Expand All @@ -448,7 +453,7 @@ public Optional<File> getMostRecentBackup() {
return FileUtils.listFiles(this.getBackupDir(), BACKUP_FILE_PATTERN).stream()
.max(Comparator.comparingLong(File::lastModified));
} catch (Exception exception) {
LOG.error("Could not locate recent backup", exception);
log.error("Could not locate recent backup", exception);
}

return Optional.empty();
Expand All @@ -459,19 +464,19 @@ public void cleanupCreatedBackups() {
List<File> backups = FileUtils.listFiles(this.getBackupDir(), BACKUP_FILE_PATTERN);
// Keep 3 most recent backups to avoid interference between continuous backups and HA
this.cleanupBackups(backups, 3);
} catch (IOException ioException) {
LOG.warn("Failed to list or delete backups");
} catch (IOException e) {
log.warn("Failed to list or delete backups", e);
}
}

boolean syncToRemoteInstance(PlatformInstance remoteInstance) {
HighAvailabilityConfig config = remoteInstance.getConfig();
String remoteAddr = remoteInstance.getAddress();
LOG.debug("Syncing data to " + remoteAddr + "...");
log.debug("Syncing data to {}...", remoteAddr);

// Ensure that the remote instance is demoted if this instance is the most current leader.
if (!this.demoteRemoteInstance(remoteInstance, false)) {
LOG.error("Error demoting remote instance " + remoteAddr);
log.error("Error demoting remote instance {}", remoteAddr);
return false;
}

Expand All @@ -484,61 +489,72 @@ List<File> listBackups(URL leader) {
Path backupDir = this.getReplicationDirFor(leader.getHost());

if (!backupDir.toFile().exists() || !backupDir.toFile().isDirectory()) {
LOG.debug(String.format("%s directory does not exist", backupDir.toFile().getName()));

return new ArrayList<>();
log.debug(String.format("%s directory does not exist", backupDir.toFile().getName()));
return new ArrayList<>(1);
}

return FileUtils.listFiles(backupDir, PlatformReplicationHelper.BACKUP_FILE_PATTERN);
} catch (Exception e) {
LOG.error("Error listing backups for platform instance {}", leader.getHost(), e);

log.error("Error listing backups for platform instance {}", leader.getHost(), e);
return new ArrayList<>();
}
}

void cleanupReceivedBackups(URL leader, int numToRetain) {
List<File> backups = this.listBackups(leader);
this.cleanupBackups(backups, numToRetain);
}

Optional<PlatformInstance> processImportedInstance(PlatformInstance i) {
Optional<HighAvailabilityConfig> config = HighAvailabilityConfig.get();
Optional<PlatformInstance> existingInstance = PlatformInstance.getByAddress(i.getAddress());
if (config.isPresent()) {
// Ensure the previous leader is marked as a follower to avoid uniqueness violation.
if (i.getIsLeader()) {
Optional<PlatformInstance> existingLeader = config.get().getLeader();
if (existingLeader.isPresent()
&& !existingLeader.get().getAddress().equals(i.getAddress())) {
existingLeader.get().demote();
}
// Save the HA config to a file before it is wiped out in backup restore.
Path saveLocalHighAvailabilityConfig(HighAvailabilityConfig config) {
Path localConfigDir = getReplicationDirFor("localhost");
Path localHaConfigPath = getReplicationDirFor("localhost").resolve(LOCAL_HA_CONFIG_JSON_FILE);
try {
localConfigDir.toFile().mkdirs();
File file = localConfigDir.resolve(LOCAL_HA_CONFIG_JSON_FILE).toFile();
if (file.exists()) {
file.delete();
}
Json.mapper().writeValue(file, config);
return localHaConfigPath;
} catch (Exception e) {
log.error("Failed to write local HA config to file {}", localHaConfigPath, e);
throw new RuntimeException(e);
}
}

if (existingInstance.isPresent()) {
// Since we sync instances after sending backups, the leader instance has the source of
// truth as to when the last backup has been successfully sent to followers.
existingInstance.get().setLastBackup(i.getLastBackup());
existingInstance.get().setIsLeader(i.getIsLeader());
existingInstance.get().update();
i = existingInstance.get();
} else {
i.setIsLocal(false);
i.setConfig(config.get());
i.save();
// Read the HA config from the file.
Optional<HighAvailabilityConfig> maybeGetLocalHighAvailabilityConfig() {
File localHaConfigFile =
getReplicationDirFor("localhost").resolve(LOCAL_HA_CONFIG_JSON_FILE).toFile();
try {
if (localHaConfigFile.exists() && localHaConfigFile.isFile()) {
HighAvailabilityConfig config =
Json.mapper().readValue(localHaConfigFile, HighAvailabilityConfig.class);
return Optional.of(config);
}
} catch (Exception e) {
log.warn("Failed to read local HA config from file {}", localHaConfigFile, e);
}
return Optional.empty();
}

return Optional.of(i);
// Delete the local HA config file.
boolean deleteLocalHighAvailabilityConfig() {
Path localHaConfigPath = getReplicationDirFor("localhost").resolve(LOCAL_HA_CONFIG_JSON_FILE);
try {
return Files.deleteIfExists(localHaConfigPath);
} catch (IOException e) {
log.warn("Fail to delete the local HA config file {}", localHaConfigPath, e);
}
return false;
}

return Optional.empty();
void cleanupReceivedBackups(URL leader, int numToRetain) {
List<File> backups = this.listBackups(leader);
this.cleanupBackups(backups, numToRetain);
}

public synchronized <T extends PlatformBackupParams> ShellResponse runCommand(T params) {
List<String> commandArgs = params.getCommandArgs();
Map<String, String> extraVars = params.getExtraVars();

LOG.debug("Command to run: [" + String.join(" ", commandArgs) + "]");
log.debug("Command to run: [" + String.join(" ", commandArgs) + "]");

boolean logCmdOutput = isBackupScriptOutputEnabled();
return shellProcessHandler.run(commandArgs, extraVars, logCmdOutput);
Expand Down
Loading

0 comments on commit 1e004c4

Please sign in to comment.