Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1052][FOLLOWUP] Introduce dynamic ConfigService at SystemLevel and TenantLevel #2125

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
}

def dynamicConfigStoreBackend: String = get(DYNAMIC_CONFIG_STORE_BACKEND)
def dynamicConfigRefreshTime: Long = get(DYNAMIC_CONFIG_REFRESH_TIME)
def dynamicConfigRefreshInterval: Long = get(DYNAMIC_CONFIG_REFRESH_INTERVAL)

// //////////////////////////////////////////////////////
// Network //
Expand Down Expand Up @@ -4166,17 +4166,18 @@ object CelebornConf extends Logging {
val DYNAMIC_CONFIG_STORE_BACKEND: ConfigEntry[String] =
buildConf("celeborn.dynamicConfig.store.backend")
.categories("master", "worker")
.doc("Store backend for dynamic config, NONE means disabling dynamic config store")
.doc("Store backend for dynamic config. Available options: NONE, FS. Note: NONE means disabling dynamic config store.")
.version("0.4.0")
.stringConf
.checkValues(Set("FS", "NONE"))
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set("NONE", "FS"))
.createWithDefault("NONE")

val DYNAMIC_CONFIG_REFRESH_TIME: ConfigEntry[Long] =
buildConf("celeborn.dynamicConfig.refresh.time")
val DYNAMIC_CONFIG_REFRESH_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.dynamicConfig.refresh.interval")
.categories("master", "worker")
.version("0.4.0")
.doc("The time interval for refreshing the corresponding dynamic config periodically")
.doc("Interval for refreshing the corresponding dynamic config periodically.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
}
4 changes: 2 additions & 2 deletions docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ license: |
<!--begin-include-->
| Key | Default | Description | Since |
| --- | ------- | ----------- | ----- |
| celeborn.dynamicConfig.refresh.time | 120s | The time interval for refreshing the corresponding dynamic config periodically | 0.4.0 |
| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config, NONE means disabling dynamic config store | 0.4.0 |
| celeborn.dynamicConfig.refresh.interval | 120s | Interval for refreshing the corresponding dynamic config periodically. | 0.4.0 |
| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config. Available options: NONE, FS. Note: NONE means disabling dynamic config store. | 0.4.0 |
| celeborn.master.estimatedPartitionSize.initialSize | 64mb | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 |
| celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | Initial delay time before start updating partition size for estimation. | 0.3.0 |
| celeborn.master.estimatedPartitionSize.update.interval | 10min | Interval of updating partition size for estimation. | 0.3.0 |
Expand Down
4 changes: 2 additions & 2 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ license: |
<!--begin-include-->
| Key | Default | Description | Since |
| --- | ------- | ----------- | ----- |
| celeborn.dynamicConfig.refresh.time | 120s | The time interval for refreshing the corresponding dynamic config periodically | 0.4.0 |
| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config, NONE means disabling dynamic config store | 0.4.0 |
| celeborn.dynamicConfig.refresh.interval | 120s | Interval for refreshing the corresponding dynamic config periodically. | 0.4.0 |
| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config. Available options: NONE, FS. Note: NONE means disabling dynamic config store. | 0.4.0 |
| celeborn.master.endpoints | &lt;localhost&gt;:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 |
| celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 |
| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,4 @@ default DynamicConfig getTenantConfig(String tenantId) {
void refreshAllCache();

void shutdown();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,23 @@

package org.apache.celeborn.server.common.service.config;

import org.apache.celeborn.common.internal.config.ConfigEntry;
import org.apache.celeborn.common.util.Utils;
import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import org.apache.celeborn.common.internal.config.ConfigEntry;
import org.apache.celeborn.common.util.Utils;

/**
* Dynamic configuration is a type of configuration that can be changed at runtime as needed. It can be used at system level/tenant level.
* When applying dynamic configuration, the priority order is as follows: tenant level overrides system level,
* which in turn overrides static configuration(CelebornConf). This means that if a configuration is defined at the tenant level,
* it will be used instead of the system level or static configuration(CelebornConf). If the tenant-level configuration is missing,
* the system-level configuration will be used. If the system-level configuration is also missing, CelebornConf
* will be used as the default value.
* Dynamic configuration is a type of configuration that can be changed at runtime as needed. It can
* be used at system level/tenant level. When applying dynamic configuration, the priority order is
* as follows: tenant level overrides system level, which in turn overrides static
* configuration(CelebornConf). This means that if a configuration is defined at the tenant level,
* it will be used instead of the system level or static configuration(CelebornConf). If the
* tenant-level configuration is missing, the system-level configuration will be used. If the
* system-level configuration is also missing, CelebornConf will be used as the default value.
*/
public abstract class DynamicConfig {
private static final Logger LOG = LoggerFactory.getLogger(DynamicConfig.class);
Expand All @@ -42,26 +44,35 @@ public abstract class DynamicConfig {
public <T> T getWithDefaultValue(
String configKey, T defaultValue, Class<T> finalType, ConfigType configType) {
String configValue = configs.get(configKey);
T formatValue = configValue != null ? formatValue(configKey, configValue, finalType, configType) : null;
T formatValue =
configValue != null ? formatValue(configKey, configValue, finalType, configType) : null;
if (formatValue == null) {
return defaultValue;
} else {
return formatValue;
}
}

public <T> T getValue(String configKey, ConfigEntry<Object> configEntry, Class<T> finalType, ConfigType configType) {
public <T> T getValue(
String configKey,
ConfigEntry<Object> configEntry,
Class<T> finalType,
ConfigType configType) {
String configValue = configs.get(configKey);
T formatValue = configValue != null ? formatValue(configKey, configValue, finalType, configType) : null;
T formatValue =
configValue != null ? formatValue(configKey, configValue, finalType, configType) : null;
if (formatValue == null) {
DynamicConfig parentLevelConfig = getParentLevelConfig();
return parentLevelConfig != null? parentLevelConfig.getValue(configKey, configEntry, finalType, configType): null;
return parentLevelConfig != null
? parentLevelConfig.getValue(configKey, configEntry, finalType, configType)
: null;
} else {
return formatValue;
}
}

public <T> T formatValue(String configKey, String configValue, Class<T> finalType, ConfigType configType) {
public <T> T formatValue(
String configKey, String configValue, Class<T> finalType, ConfigType configType) {
try {
if (configValue != null) {
if (ConfigType.BYTES == configType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ public static ConfigService getConfigService(CelebornConf celebornConf) {

return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.celeborn.server.common.service.config;

import org.apache.celeborn.common.util.ThreadUtils;

import java.io.File;
import java.io.FileInputStream;
import java.util.HashMap;
Expand All @@ -30,19 +28,21 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import scala.concurrent.duration.Duration;
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import org.apache.celeborn.common.CelebornConf;

import scala.concurrent.duration.Duration;
import org.apache.celeborn.common.util.ThreadUtils;

public class FsConfigServiceImpl implements ConfigService {
private static final Logger LOG = LoggerFactory.getLogger(FsConfigServiceImpl.class);
private CelebornConf celebornConf;
private final CelebornConf celebornConf;
private final AtomicReference<SystemConfig> systemConfigAtomicReference = new AtomicReference<>();
private final AtomicReference<Map<String, TenantConfig>> tenantConfigAtomicReference = new AtomicReference<>(new HashMap<>());
private final AtomicReference<Map<String, TenantConfig>> tenantConfigAtomicReference =
new AtomicReference<>(new HashMap<>());
private static final String CONF_TENANT_ID = "tenantId";
private static final String CONF_LEVEL = "level";
private static final String CONF_CONFIG = "config";
Expand All @@ -52,13 +52,11 @@ public class FsConfigServiceImpl implements ConfigService {

public FsConfigServiceImpl(CelebornConf celebornConf) {
this.celebornConf = celebornConf;
this.systemConfigAtomicReference.set(new SystemConfig(celebornConf));
this.refresh();
long dynamicConfigRefreshTime = celebornConf.dynamicConfigRefreshTime();
long dynamicConfigRefreshTime = celebornConf.dynamicConfigRefreshInterval();
this.configRefreshService.scheduleWithFixedDelay(
() -> refresh(),
dynamicConfigRefreshTime,
dynamicConfigRefreshTime,
TimeUnit.MILLISECONDS);
this::refresh, dynamicConfigRefreshTime, dynamicConfigRefreshTime, TimeUnit.MILLISECONDS);
}

private synchronized void refresh() {
Expand All @@ -78,7 +76,7 @@ private synchronized void refresh() {
Map<String, String> config =
((Map<String, Object>) settings.get(CONF_CONFIG))
.entrySet().stream()
.collect(Collectors.toMap(a -> a.getKey(), a -> a.getValue().toString()));
.collect(Collectors.toMap(Map.Entry::getKey, a -> a.getValue().toString()));
if (ConfigLevel.TENANT.name().equals(level)) {
TenantConfig tenantConfig = new TenantConfig(this, tenantId, config);
tenantConfs.put(tenantId, tenantConfig);
Expand All @@ -88,10 +86,13 @@ private synchronized void refresh() {
}
} catch (Exception e) {
LOG.warn("Refresh dynamic config error: {}", e.getMessage(), e);
return;
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved
}

tenantConfigAtomicReference.set(tenantConfs);
systemConfigAtomicReference.set(systemConfig == null ? new SystemConfig(celebornConf) : systemConfig);
if (systemConfig != null) {
systemConfigAtomicReference.set(systemConfig);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.celeborn.server.common.service.config;

import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.internal.config.ConfigEntry;

import java.util.HashMap;
import java.util.Map;

import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.internal.config.ConfigEntry;

public class SystemConfig extends DynamicConfig {
private CelebornConf celebornConf;
private final CelebornConf celebornConf;

public SystemConfig(CelebornConf celebornConf, Map<String, String> configs) {
this.celebornConf = celebornConf;
this.configs.putAll(configs);
Expand All @@ -40,9 +41,14 @@ public DynamicConfig getParentLevelConfig() {
return null;
}

public <T> T getValue(String configKey, ConfigEntry<Object> configEntry, Class<T> finalType, ConfigType configType) {
public <T> T getValue(
String configKey,
ConfigEntry<Object> configEntry,
Class<T> finalType,
ConfigType configType) {
String configValue = configs.get(configKey);
T formatValue = configValue != null ? formatValue(configKey, configValue, finalType, configType) : null;
T formatValue =
configValue != null ? formatValue(configKey, configValue, finalType, configType) : null;
if (formatValue == null && configEntry != null) {
return convert(finalType, celebornConf.get(configEntry).toString());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@
import java.util.Map;

public class TenantConfig extends DynamicConfig {
private String tenantId;
private ConfigService configService;
private final String tenantId;
private final ConfigService configService;

public TenantConfig(ConfigService configService, String tenantId, Map<String, String> configs) {
this.configService = configService;
this.configs.putAll(configs);
this.tenantId = tenantId;
}

public Map<String, String> getConfigs() {
return configs;
public String getTenantId() {
return tenantId;
}

@Override
Expand Down
Loading
Loading