From d344966f165221d10917a7d0e44af8b40c980781 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Thu, 30 Nov 2023 20:04:04 +0800 Subject: [PATCH 1/3] [CELEBORN-1052][FOLLOWUP] Introduce dynamic ConfigService at SystemLevel and TenantLevel --- .../apache/celeborn/common/CelebornConf.scala | 7 +- docs/configuration/master.md | 4 +- docs/configuration/worker.md | 4 +- .../common/service/config/ConfigLevel.java | 0 .../common/service/config/ConfigService.java | 7 +- .../common/service/config/DynamicConfig.java | 41 ++-- .../config/DynamicConfigServiceFactory.java | 2 +- .../service/config/FsConfigServiceImpl.java | 25 ++- .../common/service/config/SystemConfig.java | 18 +- .../common/service/config/TenantConfig.java | 8 +- .../service/config/ConfigServiceSuiteJ.java | 177 ++++++++++++++++++ .../service/config/ConfigServiceSuiteJ.java | 123 ------------ 12 files changed, 242 insertions(+), 174 deletions(-) rename service/src/main/{scala => java}/org/apache/celeborn/server/common/service/config/ConfigLevel.java (100%) rename service/src/main/{scala => java}/org/apache/celeborn/server/common/service/config/ConfigService.java (95%) rename service/src/main/{scala => java}/org/apache/celeborn/server/common/service/config/DynamicConfig.java (74%) rename service/src/main/{scala => java}/org/apache/celeborn/server/common/service/config/DynamicConfigServiceFactory.java (99%) rename service/src/main/{scala => java}/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java (90%) rename service/src/main/{scala => java}/org/apache/celeborn/server/common/service/config/SystemConfig.java (84%) rename service/src/main/{scala => java}/org/apache/celeborn/server/common/service/config/TenantConfig.java (90%) create mode 100644 service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java delete mode 100644 service/src/test/scala/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 75742b48aae..cbfc468067b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -4166,17 +4166,18 @@ object CelebornConf extends Logging { val DYNAMIC_CONFIG_STORE_BACKEND: ConfigEntry[String] = buildConf("celeborn.dynamicConfig.store.backend") .categories("master", "worker") - .doc("Store backend for dynamic config, NONE means disabling dynamic config store") + .doc("Store backend for dynamic config. Available options: NONE, FS. Note: NONE means disabling dynamic config store.") .version("0.4.0") .stringConf - .checkValues(Set("FS", "NONE")) + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(Set("NONE", "FS")) .createWithDefault("NONE") val DYNAMIC_CONFIG_REFRESH_TIME: ConfigEntry[Long] = buildConf("celeborn.dynamicConfig.refresh.time") .categories("master", "worker") .version("0.4.0") - .doc("The time interval for refreshing the corresponding dynamic config periodically") + .doc("The time interval for refreshing the corresponding dynamic config periodically.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("120s") } diff --git a/docs/configuration/master.md b/docs/configuration/master.md index 025e9c15b3c..25fe42cac51 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -19,8 +19,8 @@ license: | | Key | Default | Description | Since | | --- | ------- | ----------- | ----- | -| celeborn.dynamicConfig.refresh.time | 120s | The time interval for refreshing the corresponding dynamic config periodically | 0.4.0 | -| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config, NONE means disabling dynamic config store | 0.4.0 | +| celeborn.dynamicConfig.refresh.time | 120s | The time interval for refreshing the corresponding dynamic config periodically. | 0.4.0 | +| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config. Available options: NONE, FS. Note: NONE means disabling dynamic config store. | 0.4.0 | | celeborn.master.estimatedPartitionSize.initialSize | 64mb | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 | | celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | Initial delay time before start updating partition size for estimation. | 0.3.0 | | celeborn.master.estimatedPartitionSize.update.interval | 10min | Interval of updating partition size for estimation. | 0.3.0 | diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index aa0e555de2f..696f0d323b4 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -19,8 +19,8 @@ license: | | Key | Default | Description | Since | | --- | ------- | ----------- | ----- | -| celeborn.dynamicConfig.refresh.time | 120s | The time interval for refreshing the corresponding dynamic config periodically | 0.4.0 | -| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config, NONE means disabling dynamic config store | 0.4.0 | +| celeborn.dynamicConfig.refresh.time | 120s | The time interval for refreshing the corresponding dynamic config periodically. | 0.4.0 | +| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config. Available options: NONE, FS. Note: NONE means disabling dynamic config store. | 0.4.0 | | celeborn.master.endpoints | <localhost>:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `:[,:]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 | | celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 | | celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 | diff --git a/service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigLevel.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigLevel.java similarity index 100% rename from service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigLevel.java rename to service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigLevel.java diff --git a/service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigService.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java similarity index 95% rename from service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigService.java rename to service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java index 362d2b71e40..89ce3dd642f 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigService.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java @@ -17,7 +17,7 @@ package org.apache.celeborn.server.common.service.config; -public interface ConfigService { +public interface ConfigService extends AutoCloseable { SystemConfig getSystemConfig(); @@ -33,7 +33,4 @@ default DynamicConfig getTenantConfig(String tenantId) { } void refreshAllCache(); - - void shutdown(); - -} \ No newline at end of file +} diff --git a/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfig.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java similarity index 74% rename from service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfig.java rename to service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java index 9a051eea7c8..7906d830424 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfig.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java @@ -17,21 +17,23 @@ package org.apache.celeborn.server.common.service.config; -import org.apache.celeborn.common.internal.config.ConfigEntry; -import org.apache.celeborn.common.util.Utils; +import java.util.HashMap; +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; +import org.apache.celeborn.common.internal.config.ConfigEntry; +import org.apache.celeborn.common.util.Utils; /** - * Dynamic configuration is a type of configuration that can be changed at runtime as needed. It can be used at system level/tenant level. - * When applying dynamic configuration, the priority order is as follows: tenant level overrides system level, - * which in turn overrides static configuration(CelebornConf). This means that if a configuration is defined at the tenant level, - * it will be used instead of the system level or static configuration(CelebornConf). If the tenant-level configuration is missing, - * the system-level configuration will be used. If the system-level configuration is also missing, CelebornConf - * will be used as the default value. + * Dynamic configuration is a type of configuration that can be changed at runtime as needed. It can + * be used at system level/tenant level. When applying dynamic configuration, the priority order is + * as follows: tenant level overrides system level, which in turn overrides static + * configuration(CelebornConf). This means that if a configuration is defined at the tenant level, + * it will be used instead of the system level or static configuration(CelebornConf). If the + * tenant-level configuration is missing, the system-level configuration will be used. If the + * system-level configuration is also missing, CelebornConf will be used as the default value. */ public abstract class DynamicConfig { private static final Logger LOG = LoggerFactory.getLogger(DynamicConfig.class); @@ -42,7 +44,8 @@ public abstract class DynamicConfig { public T getWithDefaultValue( String configKey, T defaultValue, Class finalType, ConfigType configType) { String configValue = configs.get(configKey); - T formatValue = configValue != null ? formatValue(configKey, configValue, finalType, configType) : null; + T formatValue = + configValue != null ? formatValue(configKey, configValue, finalType, configType) : null; if (formatValue == null) { return defaultValue; } else { @@ -50,18 +53,26 @@ public T getWithDefaultValue( } } - public T getValue(String configKey, ConfigEntry configEntry, Class finalType, ConfigType configType) { + public T getValue( + String configKey, + ConfigEntry configEntry, + Class finalType, + ConfigType configType) { String configValue = configs.get(configKey); - T formatValue = configValue != null ? formatValue(configKey, configValue, finalType, configType) : null; + T formatValue = + configValue != null ? formatValue(configKey, configValue, finalType, configType) : null; if (formatValue == null) { DynamicConfig parentLevelConfig = getParentLevelConfig(); - return parentLevelConfig != null? parentLevelConfig.getValue(configKey, configEntry, finalType, configType): null; + return parentLevelConfig != null + ? parentLevelConfig.getValue(configKey, configEntry, finalType, configType) + : null; } else { return formatValue; } } - public T formatValue(String configKey, String configValue, Class finalType, ConfigType configType) { + public T formatValue( + String configKey, String configValue, Class finalType, ConfigType configType) { try { if (configValue != null) { if (ConfigType.BYTES == configType) { diff --git a/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfigServiceFactory.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfigServiceFactory.java similarity index 99% rename from service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfigServiceFactory.java rename to service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfigServiceFactory.java index 7346ddc20f9..2d015496625 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfigServiceFactory.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfigServiceFactory.java @@ -29,4 +29,4 @@ public static ConfigService getConfigService(CelebornConf celebornConf) { return null; } -} \ No newline at end of file +} diff --git a/service/src/main/scala/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java similarity index 90% rename from service/src/main/scala/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java rename to service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java index 7a9b60a88b7..54c22382886 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java @@ -17,8 +17,6 @@ package org.apache.celeborn.server.common.service.config; -import org.apache.celeborn.common.util.ThreadUtils; - import java.io.File; import java.io.FileInputStream; import java.util.HashMap; @@ -30,19 +28,21 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import scala.concurrent.duration.Duration; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; import org.apache.celeborn.common.CelebornConf; - -import scala.concurrent.duration.Duration; +import org.apache.celeborn.common.util.ThreadUtils; public class FsConfigServiceImpl implements ConfigService { private static final Logger LOG = LoggerFactory.getLogger(FsConfigServiceImpl.class); - private CelebornConf celebornConf; + private final CelebornConf celebornConf; private final AtomicReference systemConfigAtomicReference = new AtomicReference<>(); - private final AtomicReference> tenantConfigAtomicReference = new AtomicReference<>(new HashMap<>()); + private final AtomicReference> tenantConfigAtomicReference = + new AtomicReference<>(new HashMap<>()); private static final String CONF_TENANT_ID = "tenantId"; private static final String CONF_LEVEL = "level"; private static final String CONF_CONFIG = "config"; @@ -55,10 +55,7 @@ public FsConfigServiceImpl(CelebornConf celebornConf) { this.refresh(); long dynamicConfigRefreshTime = celebornConf.dynamicConfigRefreshTime(); this.configRefreshService.scheduleWithFixedDelay( - () -> refresh(), - dynamicConfigRefreshTime, - dynamicConfigRefreshTime, - TimeUnit.MILLISECONDS); + this::refresh, dynamicConfigRefreshTime, dynamicConfigRefreshTime, TimeUnit.MILLISECONDS); } private synchronized void refresh() { @@ -78,7 +75,7 @@ private synchronized void refresh() { Map config = ((Map) settings.get(CONF_CONFIG)) .entrySet().stream() - .collect(Collectors.toMap(a -> a.getKey(), a -> a.getValue().toString())); + .collect(Collectors.toMap(Map.Entry::getKey, a -> a.getValue().toString())); if (ConfigLevel.TENANT.name().equals(level)) { TenantConfig tenantConfig = new TenantConfig(this, tenantId, config); tenantConfs.put(tenantId, tenantConfig); @@ -88,10 +85,12 @@ private synchronized void refresh() { } } catch (Exception e) { LOG.warn("Refresh dynamic config error: {}", e.getMessage(), e); + return; } tenantConfigAtomicReference.set(tenantConfs); - systemConfigAtomicReference.set(systemConfig == null ? new SystemConfig(celebornConf) : systemConfig); + systemConfigAtomicReference.set( + systemConfig == null ? new SystemConfig(celebornConf) : systemConfig); } @Override @@ -110,7 +109,7 @@ public void refreshAllCache() { } @Override - public void shutdown() { + public void close() { ThreadUtils.shutdown(configRefreshService, Duration.apply("800ms")); } diff --git a/service/src/main/scala/org/apache/celeborn/server/common/service/config/SystemConfig.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java similarity index 84% rename from service/src/main/scala/org/apache/celeborn/server/common/service/config/SystemConfig.java rename to service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java index ff73bb7f2c2..812414150f7 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/service/config/SystemConfig.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/SystemConfig.java @@ -17,14 +17,15 @@ package org.apache.celeborn.server.common.service.config; -import org.apache.celeborn.common.CelebornConf; -import org.apache.celeborn.common.internal.config.ConfigEntry; - import java.util.HashMap; import java.util.Map; +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.internal.config.ConfigEntry; + public class SystemConfig extends DynamicConfig { - private CelebornConf celebornConf; + private final CelebornConf celebornConf; + public SystemConfig(CelebornConf celebornConf, Map configs) { this.celebornConf = celebornConf; this.configs.putAll(configs); @@ -40,9 +41,14 @@ public DynamicConfig getParentLevelConfig() { return null; } - public T getValue(String configKey, ConfigEntry configEntry, Class finalType, ConfigType configType) { + public T getValue( + String configKey, + ConfigEntry configEntry, + Class finalType, + ConfigType configType) { String configValue = configs.get(configKey); - T formatValue = configValue != null ? formatValue(configKey, configValue, finalType, configType) : null; + T formatValue = + configValue != null ? formatValue(configKey, configValue, finalType, configType) : null; if (formatValue == null && configEntry != null) { return convert(finalType, celebornConf.get(configEntry).toString()); } else { diff --git a/service/src/main/scala/org/apache/celeborn/server/common/service/config/TenantConfig.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/TenantConfig.java similarity index 90% rename from service/src/main/scala/org/apache/celeborn/server/common/service/config/TenantConfig.java rename to service/src/main/java/org/apache/celeborn/server/common/service/config/TenantConfig.java index 26198d6e0b0..c5e4f983c8a 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/service/config/TenantConfig.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/TenantConfig.java @@ -20,8 +20,8 @@ import java.util.Map; public class TenantConfig extends DynamicConfig { - private String tenantId; - private ConfigService configService; + private final String tenantId; + private final ConfigService configService; public TenantConfig(ConfigService configService, String tenantId, Map configs) { this.configService = configService; @@ -29,8 +29,8 @@ public TenantConfig(ConfigService configService, String tenantId, Map getConfigs() { - return configs; + public String getTenantId() { + return tenantId; } @Override diff --git a/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java b/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java new file mode 100644 index 00000000000..019ae517d07 --- /dev/null +++ b/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.service.config; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.server.common.service.config.DynamicConfig.ConfigType; + +public class ConfigServiceSuiteJ { + + @Test + public void testFsConfig() { + CelebornConf celebornConf = new CelebornConf(); + String file = getClass().getResource("/dynamicConfig.yaml").getFile(); + celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file); + celebornConf.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_TIME(), 5L); + try (FsConfigServiceImpl fsConfigService = new FsConfigServiceImpl(celebornConf)) { + + verifyConfig(fsConfigService); + + // change -> refresh config + file = getClass().getResource("/dynamicConfig_2.yaml").getFile(); + celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file); + + fsConfigService.refreshAllCache(); + SystemConfig systemConfig = fsConfigService.getSystemConfig(); + + // verify systemConfig's intConf + Integer intConfValue = + systemConfig.getValue("celeborn.test.int.only", null, Integer.TYPE, ConfigType.STRING); + Assert.assertEquals(intConfValue.intValue(), 100); + + // verify systemConfig's bytesConf -- defer to celebornConf + Long value = + systemConfig.getValue( + CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(), + CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(), + Long.TYPE, + ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 1073741824); + } + } + + public void verifyConfig(ConfigService configService) { + // ------------- Verify SystemConfig ----------------- // + SystemConfig systemConfig = configService.getSystemConfig(); + // verify systemConfig's bytesConf -- use systemConfig + Long value = + systemConfig.getValue( + CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(), + CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(), + Long.TYPE, + ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 102400); + + // verify systemConfig's bytesConf -- defer to celebornConf + value = + systemConfig.getValue( + CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(), + CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(), + Long.TYPE, + ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 1073741824); + + // verify systemConfig's bytesConf only -- use systemConfig + value = + systemConfig.getValue( + "celeborn.client.push.buffer.initial.size.only", null, Long.TYPE, ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 10240); + + // verify systemConfig's bytesConf with none + value = + systemConfig.getValue( + "celeborn.client.push.buffer.initial.size.only.none", + null, + Long.TYPE, + ConfigType.BYTES); + Assert.assertNull(value); + + // verify systemConfig's timesConf + value = + systemConfig.getValue("celeborn.test.timeoutMs.only", null, Long.TYPE, ConfigType.TIME_MS); + Assert.assertEquals(value.longValue(), 100000); + + // verify systemConfig's BooleanConf + Boolean booleanConfValue = + systemConfig.getValue( + "celeborn.test.timeoutMs.only", null, Boolean.TYPE, ConfigType.STRING); + Assert.assertFalse(booleanConfValue); + + // verify systemConfig's intConf + Integer intConfValue = + systemConfig.getValue("celeborn.test.int.only", null, Integer.TYPE, ConfigType.STRING); + Assert.assertEquals(intConfValue.intValue(), 10); + + // ------------- Verify TenantConfig ----------------- // + DynamicConfig tenantConfig = configService.getTenantConfig("tenant_id"); + // verify tenantConfig's bytesConf -- use tenantConf + value = + tenantConfig.getValue( + CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(), + CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(), + Long.TYPE, + ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 10240); + + // verify tenantConfig's bytesConf -- defer to systemConf + value = + tenantConfig.getValue( + CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(), + CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(), + Long.TYPE, + ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 1024000); + + // verify tenantConfig's bytesConf -- defer to celebornConf + value = + tenantConfig.getValue( + CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(), + CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(), + Long.TYPE, + ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 1073741824); + + // verify tenantConfig's bytesConf only -- use tenantConf + value = + tenantConfig.getValue( + "celeborn.client.push.buffer.initial.size.only", null, Long.TYPE, ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 102400); + + // verify tenantConfig's bytesConf with none + value = + tenantConfig.getValue( + "celeborn.client.push.buffer.initial.size.only.none", + null, + Long.TYPE, + ConfigType.BYTES); + Assert.assertNull(value); + + DynamicConfig tenantConfigNone = configService.getTenantConfig("tenant_id_none"); + // verify tenantConfig's bytesConf -- defer to systemConf + value = + tenantConfigNone.getValue( + CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(), + CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(), + Long.TYPE, + ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 1024000); + + // ------------- Verify with defaultValue ----------------- // + value = + tenantConfig.getWithDefaultValue( + "celeborn.client.push.buffer.initial.size.only", 100L, Long.TYPE, ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 102400); + + Long withDefaultValue = + tenantConfigNone.getWithDefaultValue("none", 10L, Long.TYPE, ConfigType.STRING); + Assert.assertEquals(withDefaultValue.longValue(), 10); + } +} diff --git a/service/src/test/scala/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java b/service/src/test/scala/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java deleted file mode 100644 index 47c41b01171..00000000000 --- a/service/src/test/scala/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.celeborn.server.common.service.config; - -import org.apache.celeborn.common.CelebornConf; -import org.apache.celeborn.server.common.service.config.ConfigService; -import org.apache.celeborn.server.common.service.config.DynamicConfig; -import org.apache.celeborn.server.common.service.config.DynamicConfig.ConfigType; -import org.apache.celeborn.server.common.service.config.FsConfigServiceImpl; -import org.apache.celeborn.server.common.service.config.SystemConfig; -import org.junit.Assert; -import org.junit.Test; - -public class ConfigServiceSuiteJ { - - @Test - public void testFsConfig() { - CelebornConf celebornConf = new CelebornConf(); - String file = getClass().getResource("/dynamicConfig.yaml").getFile(); - celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file); - celebornConf.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_TIME(), 5l); - FsConfigServiceImpl fsConfigService = new FsConfigServiceImpl(celebornConf); - - verifyConfig(fsConfigService); - - // change -> refresh config - file = getClass().getResource("/dynamicConfig_2.yaml").getFile(); - celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file); - - fsConfigService.refreshAllCache(); - SystemConfig systemConfig = fsConfigService.getSystemConfig(); - - // verify systemConfig's intConf - Integer intConfValue = systemConfig.getValue("celeborn.test.int.only", null, Integer.TYPE, ConfigType.STRING); - Assert.assertEquals(intConfValue.intValue(), 100); - - // verify systemConfig's bytesConf -- defer to celebornConf - Long value = systemConfig.getValue(CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(), CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(), Long.TYPE, ConfigType.BYTES); - Assert.assertEquals(value.longValue(), 1073741824); - } - - public void verifyConfig(ConfigService configService) { - // ------------- Verify SystemConfig ----------------- // - SystemConfig systemConfig = configService.getSystemConfig(); - // verify systemConfig's bytesConf -- use systemConfig - Long value = systemConfig.getValue(CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(), CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(), Long.TYPE, ConfigType.BYTES); - Assert.assertEquals(value.longValue(), 102400); - - // verify systemConfig's bytesConf -- defer to celebornConf - value = systemConfig.getValue(CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(), CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(), Long.TYPE, ConfigType.BYTES); - Assert.assertEquals(value.longValue(), 1073741824); - - // verify systemConfig's bytesConf only -- use systemConfig - value = systemConfig.getValue("celeborn.client.push.buffer.initial.size.only", null, Long.TYPE, ConfigType.BYTES); - Assert.assertEquals(value.longValue(), 10240); - - // verify systemConfig's bytesConf with none - value = systemConfig.getValue("celeborn.client.push.buffer.initial.size.only.none", null, Long.TYPE, ConfigType.BYTES); - Assert.assertNull(value); - - // verify systemConfig's timesConf - value = systemConfig.getValue("celeborn.test.timeoutMs.only", null, Long.TYPE, ConfigType.TIME_MS); - Assert.assertEquals(value.longValue(), 100000); - - // verify systemConfig's BooleanConf - Boolean booleanConfValue = systemConfig.getValue("celeborn.test.timeoutMs.only", null, Boolean.TYPE, ConfigType.STRING); - Assert.assertFalse(booleanConfValue); - - // verify systemConfig's intConf - Integer intConfValue = systemConfig.getValue("celeborn.test.int.only", null, Integer.TYPE, ConfigType.STRING); - Assert.assertEquals(intConfValue.intValue(), 10); - - // ------------- Verify TenantConfig ----------------- // - DynamicConfig tenantConfig = configService.getTenantConfig("tenant_id"); - // verify tenantConfig's bytesConf -- use tenantConf - value = tenantConfig.getValue(CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(), CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(), Long.TYPE, ConfigType.BYTES); - Assert.assertEquals(value.longValue(), 10240); - - // verify tenantConfig's bytesConf -- defer to systemConf - value = tenantConfig.getValue(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(), CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(), Long.TYPE, ConfigType.BYTES); - Assert.assertEquals(value.longValue(), 1024000); - - // verify tenantConfig's bytesConf -- defer to celebornConf - value = tenantConfig.getValue(CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(), CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(), Long.TYPE, ConfigType.BYTES); - Assert.assertEquals(value.longValue(), 1073741824); - - // verify tenantConfig's bytesConf only -- use tenantConf - value = tenantConfig.getValue("celeborn.client.push.buffer.initial.size.only", null, Long.TYPE, ConfigType.BYTES); - Assert.assertEquals(value.longValue(), 102400); - - // verify tenantConfig's bytesConf with none - value = tenantConfig.getValue("celeborn.client.push.buffer.initial.size.only.none", null, Long.TYPE, ConfigType.BYTES); - Assert.assertNull(value); - - DynamicConfig tenantConfigNone = configService.getTenantConfig("tenant_id_none"); - // verify tenantConfig's bytesConf -- defer to systemConf - value = tenantConfigNone.getValue(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(), CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(), Long.TYPE, ConfigType.BYTES); - Assert.assertEquals(value.longValue(), 1024000); - - - // ------------- Verify with defaultValue ----------------- // - value = tenantConfig.getWithDefaultValue("celeborn.client.push.buffer.initial.size.only", 100l, Long.TYPE, ConfigType.BYTES); - Assert.assertEquals(value.longValue(), 102400); - - Long withDefaultValue = tenantConfigNone.getWithDefaultValue("none", 10l, Long.TYPE, ConfigType.STRING); - Assert.assertEquals(withDefaultValue.longValue(), 10); - } -} \ No newline at end of file From 280b480c010d67cd131efe8b91d9b0bbdbe3b730 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Sun, 3 Dec 2023 10:30:00 +0800 Subject: [PATCH 2/3] [CELEBORN-1052][FOLLOWUP] Introduce dynamic ConfigService at SystemLevel and TenantLevel --- .../scala/org/apache/celeborn/common/CelebornConf.scala | 8 ++++---- docs/configuration/master.md | 2 +- docs/configuration/worker.md | 2 +- .../server/common/service/config/ConfigService.java | 4 +++- .../server/common/service/config/FsConfigServiceImpl.java | 4 ++-- .../server/common/service/config/ConfigServiceSuiteJ.java | 8 +++++--- 6 files changed, 16 insertions(+), 12 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index cbfc468067b..e56d3aa1a19 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -367,7 +367,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se } def dynamicConfigStoreBackend: String = get(DYNAMIC_CONFIG_STORE_BACKEND) - def dynamicConfigRefreshTime: Long = get(DYNAMIC_CONFIG_REFRESH_TIME) + def dynamicConfigRefreshInterval: Long = get(DYNAMIC_CONFIG_REFRESH_INTERVAL) // ////////////////////////////////////////////////////// // Network // @@ -4173,11 +4173,11 @@ object CelebornConf extends Logging { .checkValues(Set("NONE", "FS")) .createWithDefault("NONE") - val DYNAMIC_CONFIG_REFRESH_TIME: ConfigEntry[Long] = - buildConf("celeborn.dynamicConfig.refresh.time") + val DYNAMIC_CONFIG_REFRESH_INTERVAL: ConfigEntry[Long] = + buildConf("celeborn.dynamicConfig.refresh.interval") .categories("master", "worker") .version("0.4.0") - .doc("The time interval for refreshing the corresponding dynamic config periodically.") + .doc("Interval for refreshing the corresponding dynamic config periodically.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("120s") } diff --git a/docs/configuration/master.md b/docs/configuration/master.md index 25fe42cac51..92ddae7008e 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -19,7 +19,7 @@ license: | | Key | Default | Description | Since | | --- | ------- | ----------- | ----- | -| celeborn.dynamicConfig.refresh.time | 120s | The time interval for refreshing the corresponding dynamic config periodically. | 0.4.0 | +| celeborn.dynamicConfig.refresh.interval | 120s | Interval for refreshing the corresponding dynamic config periodically. | 0.4.0 | | celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config. Available options: NONE, FS. Note: NONE means disabling dynamic config store. | 0.4.0 | | celeborn.master.estimatedPartitionSize.initialSize | 64mb | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 | | celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | Initial delay time before start updating partition size for estimation. | 0.3.0 | diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 696f0d323b4..8628d3d135b 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -19,7 +19,7 @@ license: | | Key | Default | Description | Since | | --- | ------- | ----------- | ----- | -| celeborn.dynamicConfig.refresh.time | 120s | The time interval for refreshing the corresponding dynamic config periodically. | 0.4.0 | +| celeborn.dynamicConfig.refresh.interval | 120s | Interval for refreshing the corresponding dynamic config periodically. | 0.4.0 | | celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config. Available options: NONE, FS. Note: NONE means disabling dynamic config store. | 0.4.0 | | celeborn.master.endpoints | <localhost>:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `:[,:]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 | | celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 | diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java index 89ce3dd642f..80e02dc23d9 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java @@ -17,7 +17,7 @@ package org.apache.celeborn.server.common.service.config; -public interface ConfigService extends AutoCloseable { +public interface ConfigService { SystemConfig getSystemConfig(); @@ -33,4 +33,6 @@ default DynamicConfig getTenantConfig(String tenantId) { } void refreshAllCache(); + + void shutdown(); } diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java index 54c22382886..bd993ace3c0 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java @@ -53,7 +53,7 @@ public class FsConfigServiceImpl implements ConfigService { public FsConfigServiceImpl(CelebornConf celebornConf) { this.celebornConf = celebornConf; this.refresh(); - long dynamicConfigRefreshTime = celebornConf.dynamicConfigRefreshTime(); + long dynamicConfigRefreshTime = celebornConf.dynamicConfigRefreshInterval(); this.configRefreshService.scheduleWithFixedDelay( this::refresh, dynamicConfigRefreshTime, dynamicConfigRefreshTime, TimeUnit.MILLISECONDS); } @@ -109,7 +109,7 @@ public void refreshAllCache() { } @Override - public void close() { + public void shutdown() { ThreadUtils.shutdown(configRefreshService, Duration.apply("800ms")); } diff --git a/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java b/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java index 019ae517d07..d94db2f024a 100644 --- a/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java +++ b/service/src/test/java/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java @@ -30,9 +30,9 @@ public void testFsConfig() { CelebornConf celebornConf = new CelebornConf(); String file = getClass().getResource("/dynamicConfig.yaml").getFile(); celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file); - celebornConf.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_TIME(), 5L); - try (FsConfigServiceImpl fsConfigService = new FsConfigServiceImpl(celebornConf)) { - + celebornConf.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_INTERVAL(), 5L); + FsConfigServiceImpl fsConfigService = new FsConfigServiceImpl(celebornConf); + try { verifyConfig(fsConfigService); // change -> refresh config @@ -55,6 +55,8 @@ public void testFsConfig() { Long.TYPE, ConfigType.BYTES); Assert.assertEquals(value.longValue(), 1073741824); + } finally { + fsConfigService.shutdown(); } } From 60c4666d579e086fb639dc95366615e589e11d73 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Mon, 4 Dec 2023 17:03:32 +0800 Subject: [PATCH 3/3] [CELEBORN-1052][FOLLOWUP] Introduce dynamic ConfigService at SystemLevel and TenantLevel --- .../server/common/service/config/FsConfigServiceImpl.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java index bd993ace3c0..bb7519d9b63 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java @@ -52,6 +52,7 @@ public class FsConfigServiceImpl implements ConfigService { public FsConfigServiceImpl(CelebornConf celebornConf) { this.celebornConf = celebornConf; + this.systemConfigAtomicReference.set(new SystemConfig(celebornConf)); this.refresh(); long dynamicConfigRefreshTime = celebornConf.dynamicConfigRefreshInterval(); this.configRefreshService.scheduleWithFixedDelay( @@ -89,8 +90,9 @@ private synchronized void refresh() { } tenantConfigAtomicReference.set(tenantConfs); - systemConfigAtomicReference.set( - systemConfig == null ? new SystemConfig(celebornConf) : systemConfig); + if (systemConfig != null) { + systemConfigAtomicReference.set(systemConfig); + } } @Override