diff --git a/chunjun-metrics/chunjun-metrics-prometheus/src/main/java/com/dtstack/chunjun/metrics/prometheus/PrometheusReport.java b/chunjun-metrics/chunjun-metrics-prometheus/src/main/java/com/dtstack/chunjun/metrics/prometheus/PrometheusReport.java index ff8c4ca40d..a5e0fcb74f 100644 --- a/chunjun-metrics/chunjun-metrics-prometheus/src/main/java/com/dtstack/chunjun/metrics/prometheus/PrometheusReport.java +++ b/chunjun-metrics/chunjun-metrics-prometheus/src/main/java/com/dtstack/chunjun/metrics/prometheus/PrometheusReport.java @@ -48,6 +48,8 @@ import java.io.IOException; import java.lang.reflect.Field; +import java.net.MalformedURLException; +import java.net.URL; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; @@ -70,6 +72,8 @@ public class PrometheusReport extends CustomReporter { private Configuration configuration; + // 如果是 Flink1.16 或更高版本,PrometheusPushGateway 主机名和端口号的配置项变成了 metrics.reporter.promgateway.hostUrl + private static final String KEY_HOST_URL = "metrics.reporter.promgateway.hostUrl"; private static final String KEY_HOST = "metrics.reporter.promgateway.host"; private static final String KEY_PORT = "metrics.reporter.promgateway.port"; private static final String KEY_JOB_NAME = "metrics.reporter.promgateway.jobName"; @@ -79,7 +83,7 @@ public class PrometheusReport extends CustomReporter { "metrics.reporter.promgateway.deleteOnShutdown"; private static final char SCOPE_SEPARATOR = '_'; - private static final String SCOPE_PREFIX = "org/apache/flink" + SCOPE_SEPARATOR; + private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR; private final Map> collectorsWithCountByMetricName = new HashMap<>(); @@ -107,8 +111,22 @@ private void initConfiguration() { @Override public void open() { + // 尝试从新的配置项读取,如果未配置则使用旧的配置项 + String hostUrl = configuration.getString(KEY_HOST_URL, ""); String host = configuration.getString(KEY_HOST, null); int port = configuration.getInteger(KEY_PORT, 0); + // 解析 hostUrl 如果有 + if (!StringUtils.isNullOrWhitespaceOnly(hostUrl)) { + try { + URL url = new URL(hostUrl); + host = url.getHost(); + port = url.getPort(); + } catch (MalformedURLException e) { + log.error("Error parsing Prometheus PushGateway URL: {}", hostUrl, e); + return; // URL 格式错误则终止执行 + } + } + String configuredJobName = configuration.getString(KEY_JOB_NAME, "jiangboJob"); boolean randomSuffix = configuration.getBoolean(KEY_RANDOM_JOB_NAME_SUFFIX, false); deleteOnShutdown = configuration.getBoolean(KEY_DELETE_ON_SHUTDOWN, true);