diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 34fcfaca71e..ce1bb317162 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -366,6 +366,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se } } + def dynamicConfigStoreBackend: String = get(DYNAMIC_CONFIG_STORE_BACKEND) + def dynamicConfigRefreshTime: Long = get(DYNAMIC_CONFIG_REFRESH_TIME) + // ////////////////////////////////////////////////////// // Network // // ////////////////////////////////////////////////////// @@ -4053,4 +4056,21 @@ object CelebornConf extends Logging { .doc("Kerberos keytab file path for HDFS storage connection.") .stringConf .createOptional + + val DYNAMIC_CONFIG_STORE_BACKEND: ConfigEntry[String] = + buildConf("celeborn.dynamicConfig.store.backend") + .categories("master", "worker") + .doc("Store backend for dynamic config, NONE means disabling dynamic config store") + .version("0.4.0") + .stringConf + .checkValues(Set("FS", "NONE")) + .createWithDefault("NONE") + + val DYNAMIC_CONFIG_REFRESH_TIME: ConfigEntry[Long] = + buildConf("celeborn.dynamicConfig.refresh.time") + .categories("master", "worker") + .version("0.4.0") + .doc("The time interval for refreshing the corresponding dynamic config periodically") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("120s") } diff --git a/conf/dynamicConfig.yaml.template b/conf/dynamicConfig.yaml.template new file mode 100644 index 00000000000..87789754bb3 --- /dev/null +++ b/conf/dynamicConfig.yaml.template @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +- level: SYSTEM + config: + celeborn.worker.directMemoryRatioToPauseReceive: 0.75 + + +- tenantId: tenant_id + level: TENANT + config: + + + diff --git a/docs/configuration/master.md b/docs/configuration/master.md index c21a76edde5..025e9c15b3c 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -19,6 +19,8 @@ license: | | Key | Default | Description | Since | | --- | ------- | ----------- | ----- | +| celeborn.dynamicConfig.refresh.time | 120s | The time interval for refreshing the corresponding dynamic config periodically | 0.4.0 | +| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config, NONE means disabling dynamic config store | 0.4.0 | | celeborn.master.estimatedPartitionSize.initialSize | 64mb | Initial partition size for estimation, it will change according to runtime stats. | 0.3.0 | | celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | Initial delay time before start updating partition size for estimation. | 0.3.0 | | celeborn.master.estimatedPartitionSize.update.interval | 10min | Interval of updating partition size for estimation. | 0.3.0 | diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 1570d124991..a40cc9c5405 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -19,6 +19,8 @@ license: | | Key | Default | Description | Since | | --- | ------- | ----------- | ----- | +| celeborn.dynamicConfig.refresh.time | 120s | The time interval for refreshing the corresponding dynamic config periodically | 0.4.0 | +| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic config, NONE means disabling dynamic config store | 0.4.0 | | celeborn.master.endpoints | <localhost>:9097 | Endpoints of master nodes for celeborn client to connect, allowed pattern is: `:[,:]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. | 0.2.0 | | celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 | | celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 | diff --git a/service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigLevel.java b/service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigLevel.java new file mode 100644 index 00000000000..12182146187 --- /dev/null +++ b/service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigLevel.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.service.config; + +public enum ConfigLevel { + SYSTEM, + TENANT, +} diff --git a/service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigService.java b/service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigService.java new file mode 100644 index 00000000000..362d2b71e40 --- /dev/null +++ b/service/src/main/scala/org/apache/celeborn/server/common/service/config/ConfigService.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.service.config; + +public interface ConfigService { + + SystemConfig getSystemConfig(); + + TenantConfig getRawTenantConfig(String tenantId); + + default DynamicConfig getTenantConfig(String tenantId) { + TenantConfig tenantConfig = getRawTenantConfig(tenantId); + if (tenantConfig == null || tenantConfig.getConfigs().isEmpty()) { + return getSystemConfig(); + } else { + return tenantConfig; + } + } + + void refreshAllCache(); + + void shutdown(); + +} \ No newline at end of file diff --git a/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfig.java b/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfig.java new file mode 100644 index 00000000000..9a051eea7c8 --- /dev/null +++ b/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfig.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.service.config; + +import org.apache.celeborn.common.internal.config.ConfigEntry; +import org.apache.celeborn.common.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Dynamic configuration is a type of configuration that can be changed at runtime as needed. It can be used at system level/tenant level. + * When applying dynamic configuration, the priority order is as follows: tenant level overrides system level, + * which in turn overrides static configuration(CelebornConf). This means that if a configuration is defined at the tenant level, + * it will be used instead of the system level or static configuration(CelebornConf). If the tenant-level configuration is missing, + * the system-level configuration will be used. If the system-level configuration is also missing, CelebornConf + * will be used as the default value. + */ +public abstract class DynamicConfig { + private static final Logger LOG = LoggerFactory.getLogger(DynamicConfig.class); + protected Map configs = new HashMap<>(); + + public abstract DynamicConfig getParentLevelConfig(); + + public T getWithDefaultValue( + String configKey, T defaultValue, Class finalType, ConfigType configType) { + String configValue = configs.get(configKey); + T formatValue = configValue != null ? formatValue(configKey, configValue, finalType, configType) : null; + if (formatValue == null) { + return defaultValue; + } else { + return formatValue; + } + } + + public T getValue(String configKey, ConfigEntry configEntry, Class finalType, ConfigType configType) { + String configValue = configs.get(configKey); + T formatValue = configValue != null ? formatValue(configKey, configValue, finalType, configType) : null; + if (formatValue == null) { + DynamicConfig parentLevelConfig = getParentLevelConfig(); + return parentLevelConfig != null? parentLevelConfig.getValue(configKey, configEntry, finalType, configType): null; + } else { + return formatValue; + } + } + + public T formatValue(String configKey, String configValue, Class finalType, ConfigType configType) { + try { + if (configValue != null) { + if (ConfigType.BYTES == configType) { + return convert(finalType, String.valueOf(Utils.byteStringAsBytes(configValue))); + } else if (ConfigType.TIME_MS == configType) { + return convert(finalType, String.valueOf(Utils.timeStringAsMs(configValue))); + } else { + return convert(finalType, configValue); + } + } + } catch (Exception e) { + LOG.warn("Config {} value format is not valid, refer to parent if exist", configKey, e); + } + return null; + } + + public Map getConfigs() { + return configs; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("DynamicConfig{"); + sb.append("configs=").append(configs); + sb.append('}'); + return sb.toString(); + } + + public enum ConfigType { + BYTES, + STRING, + TIME_MS, + } + + public static T convert(Class clazz, String value) { + if (Boolean.TYPE == clazz) { + return (T) Boolean.valueOf(value); + } else if (Byte.TYPE == clazz) { + return (T) Byte.valueOf(value); + } else if (Short.TYPE == clazz) { + return (T) Short.valueOf(value); + } else if (Integer.TYPE == clazz) { + return (T) Integer.valueOf(value); + } else if (Long.TYPE == clazz) { + return (T) Long.valueOf(value); + } else if (Float.TYPE == clazz) { + return (T) Float.valueOf(value); + } else if (Double.TYPE == clazz) { + return (T) Double.valueOf(value); + } + return (T) value; + } +} diff --git a/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfigServiceFactory.java b/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfigServiceFactory.java new file mode 100644 index 00000000000..7346ddc20f9 --- /dev/null +++ b/service/src/main/scala/org/apache/celeborn/server/common/service/config/DynamicConfigServiceFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.service.config; + +import org.apache.celeborn.common.CelebornConf; + +public class DynamicConfigServiceFactory { + + public static ConfigService getConfigService(CelebornConf celebornConf) { + String configStoreBackend = celebornConf.dynamicConfigStoreBackend(); + if ("FS".equals(configStoreBackend)) { + return new FsConfigServiceImpl(celebornConf); + } + + return null; + } +} \ No newline at end of file diff --git a/service/src/main/scala/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java b/service/src/main/scala/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java new file mode 100644 index 00000000000..7a9b60a88b7 --- /dev/null +++ b/service/src/main/scala/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.service.config; + +import org.apache.celeborn.common.util.ThreadUtils; + +import java.io.File; +import java.io.FileInputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import org.apache.celeborn.common.CelebornConf; + +import scala.concurrent.duration.Duration; + +public class FsConfigServiceImpl implements ConfigService { + private static final Logger LOG = LoggerFactory.getLogger(FsConfigServiceImpl.class); + private CelebornConf celebornConf; + private final AtomicReference systemConfigAtomicReference = new AtomicReference<>(); + private final AtomicReference> tenantConfigAtomicReference = new AtomicReference<>(new HashMap<>()); + private static final String CONF_TENANT_ID = "tenantId"; + private static final String CONF_LEVEL = "level"; + private static final String CONF_CONFIG = "config"; + + private final ScheduledExecutorService configRefreshService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("config-refresh-service"); + + public FsConfigServiceImpl(CelebornConf celebornConf) { + this.celebornConf = celebornConf; + this.refresh(); + long dynamicConfigRefreshTime = celebornConf.dynamicConfigRefreshTime(); + this.configRefreshService.scheduleWithFixedDelay( + () -> refresh(), + dynamicConfigRefreshTime, + dynamicConfigRefreshTime, + TimeUnit.MILLISECONDS); + } + + private synchronized void refresh() { + File configurationFile = getConfigurationFile(System.getenv()); + if (!configurationFile.exists()) { + return; + } + + SystemConfig systemConfig = null; + Map tenantConfs = new HashMap<>(); + try (FileInputStream fileInputStream = new FileInputStream(configurationFile)) { + Yaml yaml = new Yaml(); + List> dynamicConfigs = yaml.load(fileInputStream); + for (Map settings : dynamicConfigs) { + String tenantId = (String) settings.get(CONF_TENANT_ID); + String level = (String) settings.get(CONF_LEVEL); + Map config = + ((Map) settings.get(CONF_CONFIG)) + .entrySet().stream() + .collect(Collectors.toMap(a -> a.getKey(), a -> a.getValue().toString())); + if (ConfigLevel.TENANT.name().equals(level)) { + TenantConfig tenantConfig = new TenantConfig(this, tenantId, config); + tenantConfs.put(tenantId, tenantConfig); + } else { + systemConfig = new SystemConfig(celebornConf, config); + } + } + } catch (Exception e) { + LOG.warn("Refresh dynamic config error: {}", e.getMessage(), e); + } + + tenantConfigAtomicReference.set(tenantConfs); + systemConfigAtomicReference.set(systemConfig == null ? new SystemConfig(celebornConf) : systemConfig); + } + + @Override + public SystemConfig getSystemConfig() { + return systemConfigAtomicReference.get(); + } + + @Override + public TenantConfig getRawTenantConfig(String tenantId) { + return tenantConfigAtomicReference.get().get(tenantId); + } + + @Override + public void refreshAllCache() { + this.refresh(); + } + + @Override + public void shutdown() { + ThreadUtils.shutdown(configRefreshService, Duration.apply("800ms")); + } + + private File getConfigurationFile(Map env) { + if (!this.celebornConf.quotaConfigurationPath().isEmpty()) { + return new File(this.celebornConf.quotaConfigurationPath().get()); + } else { + String dynamicConfPath = + Optional.ofNullable(env.get("CELEBORN_CONF_DIR")) + .orElse(env.getOrDefault("CELEBORN_HOME", ".") + File.separator + "conf"); + return new File(dynamicConfPath + File.separator + "dynamicConfig.yaml"); + } + } +} diff --git a/service/src/main/scala/org/apache/celeborn/server/common/service/config/SystemConfig.java b/service/src/main/scala/org/apache/celeborn/server/common/service/config/SystemConfig.java new file mode 100644 index 00000000000..ff73bb7f2c2 --- /dev/null +++ b/service/src/main/scala/org/apache/celeborn/server/common/service/config/SystemConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.service.config; + +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.internal.config.ConfigEntry; + +import java.util.HashMap; +import java.util.Map; + +public class SystemConfig extends DynamicConfig { + private CelebornConf celebornConf; + public SystemConfig(CelebornConf celebornConf, Map configs) { + this.celebornConf = celebornConf; + this.configs.putAll(configs); + } + + public SystemConfig(CelebornConf celebornConf) { + this.celebornConf = celebornConf; + this.configs = new HashMap<>(); + } + + @Override + public DynamicConfig getParentLevelConfig() { + return null; + } + + public T getValue(String configKey, ConfigEntry configEntry, Class finalType, ConfigType configType) { + String configValue = configs.get(configKey); + T formatValue = configValue != null ? formatValue(configKey, configValue, finalType, configType) : null; + if (formatValue == null && configEntry != null) { + return convert(finalType, celebornConf.get(configEntry).toString()); + } else { + return formatValue; + } + } +} diff --git a/service/src/main/scala/org/apache/celeborn/server/common/service/config/TenantConfig.java b/service/src/main/scala/org/apache/celeborn/server/common/service/config/TenantConfig.java new file mode 100644 index 00000000000..26198d6e0b0 --- /dev/null +++ b/service/src/main/scala/org/apache/celeborn/server/common/service/config/TenantConfig.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.service.config; + +import java.util.Map; + +public class TenantConfig extends DynamicConfig { + private String tenantId; + private ConfigService configService; + + public TenantConfig(ConfigService configService, String tenantId, Map configs) { + this.configService = configService; + this.configs.putAll(configs); + this.tenantId = tenantId; + } + + public Map getConfigs() { + return configs; + } + + @Override + public DynamicConfig getParentLevelConfig() { + return configService.getSystemConfig(); + } +} diff --git a/service/src/test/resources/dynamicConfig.yaml b/service/src/test/resources/dynamicConfig.yaml new file mode 100644 index 00000000000..22d17ee6245 --- /dev/null +++ b/service/src/test/resources/dynamicConfig.yaml @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +- level: SYSTEM + config: + celeborn.client.push.buffer.initial.size: 100k + celeborn.client.push.buffer.max.size: 1000k + celeborn.worker.fetch.heartbeat.enabled: true + celeborn.client.push.buffer.initial.size.only: 10k + celeborn.test.timeoutMs.only: 100s + celeborn.test.enabled.only: false + celeborn.test.int.only: 10 + +- tenantId: tenant_id + level: TENANT + config: + celeborn.client.push.buffer.initial.size: 10k + celeborn.client.push.buffer.initial.size.only: 100k + celeborn.worker.fetch.heartbeat.enabled: false + celeborn.test.tenant.timeoutMs.only: 100s + celeborn.test.tenant.enabled.only: false + celeborn.test.tenant.int.only: 10 + diff --git a/service/src/test/resources/dynamicConfig_2.yaml b/service/src/test/resources/dynamicConfig_2.yaml new file mode 100644 index 00000000000..57645e0b839 --- /dev/null +++ b/service/src/test/resources/dynamicConfig_2.yaml @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +- level: SYSTEM + config: + celeborn.test.int.only: 100 + diff --git a/service/src/test/scala/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java b/service/src/test/scala/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java new file mode 100644 index 00000000000..47c41b01171 --- /dev/null +++ b/service/src/test/scala/org/apache/celeborn/server/common/service/config/ConfigServiceSuiteJ.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.service.config; + +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.server.common.service.config.ConfigService; +import org.apache.celeborn.server.common.service.config.DynamicConfig; +import org.apache.celeborn.server.common.service.config.DynamicConfig.ConfigType; +import org.apache.celeborn.server.common.service.config.FsConfigServiceImpl; +import org.apache.celeborn.server.common.service.config.SystemConfig; +import org.junit.Assert; +import org.junit.Test; + +public class ConfigServiceSuiteJ { + + @Test + public void testFsConfig() { + CelebornConf celebornConf = new CelebornConf(); + String file = getClass().getResource("/dynamicConfig.yaml").getFile(); + celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file); + celebornConf.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_TIME(), 5l); + FsConfigServiceImpl fsConfigService = new FsConfigServiceImpl(celebornConf); + + verifyConfig(fsConfigService); + + // change -> refresh config + file = getClass().getResource("/dynamicConfig_2.yaml").getFile(); + celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file); + + fsConfigService.refreshAllCache(); + SystemConfig systemConfig = fsConfigService.getSystemConfig(); + + // verify systemConfig's intConf + Integer intConfValue = systemConfig.getValue("celeborn.test.int.only", null, Integer.TYPE, ConfigType.STRING); + Assert.assertEquals(intConfValue.intValue(), 100); + + // verify systemConfig's bytesConf -- defer to celebornConf + Long value = systemConfig.getValue(CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(), CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(), Long.TYPE, ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 1073741824); + } + + public void verifyConfig(ConfigService configService) { + // ------------- Verify SystemConfig ----------------- // + SystemConfig systemConfig = configService.getSystemConfig(); + // verify systemConfig's bytesConf -- use systemConfig + Long value = systemConfig.getValue(CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(), CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(), Long.TYPE, ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 102400); + + // verify systemConfig's bytesConf -- defer to celebornConf + value = systemConfig.getValue(CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(), CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(), Long.TYPE, ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 1073741824); + + // verify systemConfig's bytesConf only -- use systemConfig + value = systemConfig.getValue("celeborn.client.push.buffer.initial.size.only", null, Long.TYPE, ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 10240); + + // verify systemConfig's bytesConf with none + value = systemConfig.getValue("celeborn.client.push.buffer.initial.size.only.none", null, Long.TYPE, ConfigType.BYTES); + Assert.assertNull(value); + + // verify systemConfig's timesConf + value = systemConfig.getValue("celeborn.test.timeoutMs.only", null, Long.TYPE, ConfigType.TIME_MS); + Assert.assertEquals(value.longValue(), 100000); + + // verify systemConfig's BooleanConf + Boolean booleanConfValue = systemConfig.getValue("celeborn.test.timeoutMs.only", null, Boolean.TYPE, ConfigType.STRING); + Assert.assertFalse(booleanConfValue); + + // verify systemConfig's intConf + Integer intConfValue = systemConfig.getValue("celeborn.test.int.only", null, Integer.TYPE, ConfigType.STRING); + Assert.assertEquals(intConfValue.intValue(), 10); + + // ------------- Verify TenantConfig ----------------- // + DynamicConfig tenantConfig = configService.getTenantConfig("tenant_id"); + // verify tenantConfig's bytesConf -- use tenantConf + value = tenantConfig.getValue(CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(), CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(), Long.TYPE, ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 10240); + + // verify tenantConfig's bytesConf -- defer to systemConf + value = tenantConfig.getValue(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(), CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(), Long.TYPE, ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 1024000); + + // verify tenantConfig's bytesConf -- defer to celebornConf + value = tenantConfig.getValue(CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(), CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(), Long.TYPE, ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 1073741824); + + // verify tenantConfig's bytesConf only -- use tenantConf + value = tenantConfig.getValue("celeborn.client.push.buffer.initial.size.only", null, Long.TYPE, ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 102400); + + // verify tenantConfig's bytesConf with none + value = tenantConfig.getValue("celeborn.client.push.buffer.initial.size.only.none", null, Long.TYPE, ConfigType.BYTES); + Assert.assertNull(value); + + DynamicConfig tenantConfigNone = configService.getTenantConfig("tenant_id_none"); + // verify tenantConfig's bytesConf -- defer to systemConf + value = tenantConfigNone.getValue(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(), CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(), Long.TYPE, ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 1024000); + + + // ------------- Verify with defaultValue ----------------- // + value = tenantConfig.getWithDefaultValue("celeborn.client.push.buffer.initial.size.only", 100l, Long.TYPE, ConfigType.BYTES); + Assert.assertEquals(value.longValue(), 102400); + + Long withDefaultValue = tenantConfigNone.getWithDefaultValue("none", 10l, Long.TYPE, ConfigType.STRING); + Assert.assertEquals(withDefaultValue.longValue(), 10); + } +} \ No newline at end of file