Skip to content

Commit

Permalink
[hotfix-1786][chunjun-metrics-prometheus] Fix uploading of startLocat…
Browse files Browse the repository at this point in the history
…ion and endLocation metrics to PushGateway
  • Loading branch information
Icarus-Alpha committed Apr 12, 2024
1 parent bf9458e commit 1b2b4f3
Showing 1 changed file with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -70,6 +72,8 @@ public class PrometheusReport extends CustomReporter {

private Configuration configuration;

// 如果是 Flink1.16 或更高版本,PrometheusPushGateway 主机名和端口号的配置项变成了 metrics.reporter.promgateway.hostUrl
private static final String KEY_HOST_URL = "metrics.reporter.promgateway.hostUrl";
private static final String KEY_HOST = "metrics.reporter.promgateway.host";
private static final String KEY_PORT = "metrics.reporter.promgateway.port";
private static final String KEY_JOB_NAME = "metrics.reporter.promgateway.jobName";
Expand All @@ -79,7 +83,7 @@ public class PrometheusReport extends CustomReporter {
"metrics.reporter.promgateway.deleteOnShutdown";

private static final char SCOPE_SEPARATOR = '_';
private static final String SCOPE_PREFIX = "org/apache/flink" + SCOPE_SEPARATOR;
private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR;

private final Map<String, AbstractMap.SimpleImmutableEntry<Collector, Integer>>
collectorsWithCountByMetricName = new HashMap<>();
Expand Down Expand Up @@ -107,8 +111,22 @@ private void initConfiguration() {

@Override
public void open() {
// 尝试从新的配置项读取,如果未配置则使用旧的配置项
String hostUrl = configuration.getString(KEY_HOST_URL, "");
String host = configuration.getString(KEY_HOST, null);
int port = configuration.getInteger(KEY_PORT, 0);
// 解析 hostUrl 如果有
if (!StringUtils.isNullOrWhitespaceOnly(hostUrl)) {
try {
URL url = new URL(hostUrl);
host = url.getHost();
port = url.getPort();
} catch (MalformedURLException e) {
log.error("Error parsing Prometheus PushGateway URL: {}", hostUrl, e);
return; // URL 格式错误则终止执行
}
}

String configuredJobName = configuration.getString(KEY_JOB_NAME, "jiangboJob");
boolean randomSuffix = configuration.getBoolean(KEY_RANDOM_JOB_NAME_SUFFIX, false);
deleteOnShutdown = configuration.getBoolean(KEY_DELETE_ON_SHUTDOWN, true);
Expand Down

0 comments on commit 1b2b4f3

Please sign in to comment.