From 1857335e83aef43d93eec50a140aac945f834eaa Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Sat, 27 Apr 2024 14:47:22 +0200 Subject: [PATCH] feat(schema-registry): add SSL support for SchemaRegistry (#425) --- .../jikkou/common/utils/Enums.java | 21 ++++ docker-compose-ssl.yml | 114 ++++++++++++++++++ .../jikkou/http/client/RestClientBuilder.java | 101 ++++++++++++++++ .../jikkou/http/client/SslConfigSupport.java | 79 ++++++++++++ .../jikkou/http/client/ssl/SSLConfig.java | 30 +++++ .../http/client/ssl/SSLContextFactory.java | 44 +++++++ .../jikkou/http/client/ssl/SSLUtils.java | 84 +++++++++++++ .../schema/registry/api/AuthMethod.java | 14 +-- .../api/SchemaRegistryApiFactory.java | 4 + .../api/SchemaRegistryClientConfig.java | 42 ++++--- 10 files changed, 503 insertions(+), 30 deletions(-) create mode 100644 docker-compose-ssl.yml create mode 100644 extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/SslConfigSupport.java create mode 100644 extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/ssl/SSLConfig.java create mode 100644 extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/ssl/SSLContextFactory.java create mode 100644 extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/ssl/SSLUtils.java diff --git a/core/src/main/java/io/streamthoughts/jikkou/common/utils/Enums.java b/core/src/main/java/io/streamthoughts/jikkou/common/utils/Enums.java index fae6905c2..7b9de8a80 100644 --- a/core/src/main/java/io/streamthoughts/jikkou/common/utils/Enums.java +++ b/core/src/main/java/io/streamthoughts/jikkou/common/utils/Enums.java @@ -19,6 +19,27 @@ */ public final class Enums { + /** + * Gets the enum for specified string name. + * + * @param value The enum raw value. + * @param enumType The enum class type. + * @param The enum type. + * @return The Enum. + * @throws IllegalArgumentException if no enum exists for the specified value. + */ + public static > T getForNameIgnoreCase(final @Nullable String value, + final @NotNull Class enumType, + final T defaultValue) { + if (value == null) throw new IllegalArgumentException("Unsupported value 'null'"); + + T[] values = enumType.getEnumConstants(); + return Arrays.stream(values) + .filter(e -> e.name().equals(value.toUpperCase(Locale.ROOT))) + .findFirst() + .orElse(defaultValue); + } + /** * Gets the enum for specified string name. * diff --git a/docker-compose-ssl.yml b/docker-compose-ssl.yml new file mode 100644 index 000000000..94b62372f --- /dev/null +++ b/docker-compose-ssl.yml @@ -0,0 +1,114 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) The original authors +# +# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 +# +version: '3.8' +services: + kafka: + image: "confluentinc/cp-kafka:7.5.0" + ports: + - "9092:9092" + container_name: kafka + environment: + KAFKA_NODE_ID: 101 + # random cluster ID used for formatting LOG_DIR for KRaft + CLUSTER_ID: 'xtzWWN4bTjitpL3kfd9s5g' + KAFKA_CONTROLLER_QUORUM_VOTERS: '101@kafka:29093' + KAFKA_PROCESS_ROLES: 'broker,controller' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092' + KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://kafka:29093' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092 + CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 + CONFLUENT_METRICS_ENABLE: 'false' + KAFKA_AUTHORIZER_CLASS_NAME: org.apache.kafka.metadata.authorizer.StandardAuthorizer + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' + KAFKA_SUPER_USERS: 'User:anonymous' + networks: + - kafka-platform + + schema-registry: + image: confluentinc/cp-schema-registry:7.5.0 + hostname: schema-registry + container_name: schema-registry + ports: + - "8081:8081" + - "8082:8082" + environment: + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:29092 + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_LISTENERS: https://0.0.0.0:8081,http://0.0.0.0:8082 + SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: "/app/certs/kafka.keystore.jks" + SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: "password" + SCHEMA_REGISTRY_SSL_KEYSTORE_TYPE: "JKS" + SCHEMA_REGISTRY_SSL_KEY_PASSWORD: "password" + SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: "/app/certs/kafka.truststore.jks" + SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: "password" + SCHEMA_REGISTRY_SSL_TRUSTSTORE_TYPE: "JKS" + SCHEMA_REGISTRY_SSL_CLIENT_AUTH: "true" + volumes: + - ./certs/:/app/certs + depends_on: + - kafka + networks: + - kafka-platform + akhq: + image: tchiotludo/akhq + hostname: akhq + container_name: akhq + depends_on: + - kafka + ports: + - "8087:8080" + environment: + AKHQ_CONFIGURATION: | + akhq: + connections: + docker-kafka-server: + properties: + bootstrap.servers: "kafka:29092" + schema-registry: + url: "http://schema-registry:8081" + connect: + - name: "connect" + url: "http://connect:8083" + networks: + - kafka-platform + + connect: + image: confluentinc/cp-kafka-connect:7.5.0 + container_name: connect + depends_on: + - kafka + ports: + - "8083:8083" + - "8000:8000" + environment: + CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092' + CONNECT_REST_ADVERTISED_HOST_NAME: connect + CONNECT_REST_PORT: 8083 + CONNECT_GROUP_ID: kafka-connect + CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 + CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_TOPIC: _connect-status + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" + CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" + CONNECT_PLUGIN_PATH: "/usr/local/share/kafka/plugins,/usr/share/filestream-connectors" + networks: + - kafka-platform + +networks: + kafka-platform: + driver: bridge diff --git a/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/RestClientBuilder.java b/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/RestClientBuilder.java index 7a1632261..2899196d8 100644 --- a/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/RestClientBuilder.java +++ b/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/RestClientBuilder.java @@ -7,17 +7,27 @@ package io.streamthoughts.jikkou.http.client; import com.fasterxml.jackson.databind.ObjectMapper; +import io.streamthoughts.jikkou.core.exceptions.JikkouRuntimeException; import io.streamthoughts.jikkou.core.io.Jackson; import io.streamthoughts.jikkou.http.client.internal.ProxyInvocationHandler; +import io.streamthoughts.jikkou.http.client.ssl.SSLConfig; +import io.streamthoughts.jikkou.http.client.ssl.SSLContextFactory; +import io.streamthoughts.jikkou.http.client.ssl.SSLUtils; import jakarta.ws.rs.client.Client; import jakarta.ws.rs.client.ClientBuilder; import jakarta.ws.rs.client.WebTarget; import jakarta.ws.rs.core.MultivaluedHashMap; import jakarta.ws.rs.ext.ContextResolver; import jakarta.ws.rs.ext.Provider; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -27,10 +37,17 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.TrustManager; import org.glassfish.jersey.client.ClientProperties; import org.glassfish.jersey.jackson.internal.jackson.jaxrs.json.JacksonJsonProvider; import org.glassfish.jersey.logging.LoggingFeature; import org.jetbrains.annotations.NotNull; +import org.slf4j.LoggerFactory; import org.slf4j.bridge.SLF4JBridgeHandler; /** @@ -38,6 +55,10 @@ */ public class RestClientBuilder { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(RestClientBuilder.class); + + public static final AllowAllHostNameVerifier NO_HOST_NAME_VERIFIER = new AllowAllHostNameVerifier(); + private URI baseUri; private boolean followRedirects; @@ -48,6 +69,8 @@ public class RestClientBuilder { private final ClientBuilder clientBuilder; + private SSLContext sslContext; + private ObjectMapper objectMapper = Jackson.JSON_OBJECT_MAPPER; /** @@ -100,6 +123,31 @@ public RestClientBuilder baseUrl(URL url) { } } + /** + * Sets the truststore. + * + * @return {@code this}. + */ + public RestClientBuilder truststore(KeyStore keyStore) { + clientBuilder.trustStore(keyStore); + return this; + } + + /** + * Sets the keystore. + * + * @return {@code this}. + */ + public RestClientBuilder keystore(KeyStore keyStore, String password) { + clientBuilder.keyStore(keyStore, password); + return this; + } + + public RestClientBuilder sslIgnoreHostnameVerification() { + clientBuilder.hostnameVerifier(NO_HOST_NAME_VERIFIER); + return this; + } + /** * Sets the connect timeout. * @@ -166,6 +214,44 @@ public RestClientBuilder objectMapper(final ObjectMapper objectMapper) { return this; } + public RestClientBuilder sslConfig(final SSLConfig sslConfig) { + TrustManager[] trustManagers; + try { + trustManagers = SSLUtils.createTrustManagers( + sslConfig.trustStoreLocation(), + sslConfig.trustStorePassword().toCharArray(), + sslConfig.trustStoreType(), + KeyManagerFactory.getDefaultAlgorithm() + ); + } catch (CertificateException | + NoSuchAlgorithmException | + KeyStoreException | + IOException e) { + LOG.error("Could not create trust managers for Client Certificate authentication.", e); + throw new JikkouRuntimeException(e); + } + KeyManager[] keyManagers; + try { + keyManagers = SSLUtils.createKeyManagers( + sslConfig.keyStoreLocation(), + sslConfig.keyStorePassword().toCharArray(), + sslConfig.keyStoreType(), + KeyManagerFactory.getDefaultAlgorithm() + ); + } catch (CertificateException | + NoSuchAlgorithmException | + UnrecoverableKeyException | + KeyStoreException | + IOException e) { + LOG.error("Could not create key managers for Client Certificate authentication.", e); + throw new JikkouRuntimeException(e); + } + SSLContextFactory sslContextFactory = new SSLContextFactory(); + clientBuilder.sslContext(sslContextFactory.getSSLContext(keyManagers, trustManagers)); + + return sslConfig.ignoreHostnameVerification() ? sslIgnoreHostnameVerification() : this; + } + /** * Builds a new client for the given resource interface. * @@ -225,4 +311,19 @@ public ObjectMapper getContext(Class type) { return mapper; } } + + /** + * A {@link HostnameVerifier} that accept all certificates. + */ + public static class AllowAllHostNameVerifier implements HostnameVerifier { + + /** + * {@inheritDoc} + */ + @Override + public boolean verify(final String hostname, final SSLSession sslSession) { + return true; + } + } + } diff --git a/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/SslConfigSupport.java b/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/SslConfigSupport.java new file mode 100644 index 000000000..a2b60f1a9 --- /dev/null +++ b/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/SslConfigSupport.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) The original authors + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.jikkou.http.client; + +import io.streamthoughts.jikkou.core.config.ConfigProperty; +import io.streamthoughts.jikkou.core.config.Configuration; +import io.streamthoughts.jikkou.http.client.ssl.SSLConfig; + +public interface SslConfigSupport { + + static ConfigProperty sslKeyStoreLocation(final String configNamespace) { + return ConfigProperty + .ofString(configNamespace + ".sslKeyStoreLocation") + .description("The location of the key store file."); + } + + static ConfigProperty sslKeyStoreType(final String configNamespace) { + return ConfigProperty + .ofString(configNamespace + ".sslKeyStoreType") + .description("The file format of the key store file.") + .orElse("PKCS12"); + } + + static ConfigProperty sslKeyStorePassword(final String configNamespace) { + return ConfigProperty + .ofString(configNamespace + ".sslKeyStorePassword") + .description("The password for the key store file."); + } + + static ConfigProperty sslKeyPassword(final String configNamespace) { + return ConfigProperty + .ofString(configNamespace + ".sslKeyPassword") + .description("The password of the private key in the key store file."); + } + + static ConfigProperty sslTrustStoreLocation(final String configNamespace) { + return ConfigProperty + .ofString(configNamespace + ".sslTrustStoreLocation") + .description("The location of the trust store file."); + } + + static ConfigProperty sslTrustStoreType(final String configNamespace) { + return ConfigProperty + .ofString(configNamespace + ".sslTrustStoreType") + .description("The file format of the trust store file.") + .orElse("PKCS12"); + } + + static ConfigProperty sslTrustStorePassword(final String configNamespace) { + return ConfigProperty + .ofString(configNamespace + ".sslTrustStorePassword") + .description("The password for the trust store file."); + } + + static ConfigProperty sslIgnoreHostnameVerification(final String configNamespace) { + return ConfigProperty + .ofBoolean(configNamespace + ".sslIgnoreHostnameVerification") + .description("Specifies whether to ignore the hostname verification.") + .orElse(false); + } + + static SSLConfig getSslConfig(final String configNamespace, + final Configuration configuration) { + return new SSLConfig( + SslConfigSupport.sslKeyStoreLocation(configNamespace).getOptional(configuration).orElse(null), + SslConfigSupport.sslKeyStorePassword(configNamespace).getOptional(configuration).orElse(null), + SslConfigSupport.sslKeyStoreType(configNamespace).get(configuration), + SslConfigSupport.sslKeyPassword(configNamespace).getOptional(configuration).orElse(null), + SslConfigSupport.sslTrustStoreLocation(configNamespace).getOptional(configuration).orElse(null), + SslConfigSupport.sslTrustStorePassword(configNamespace).getOptional(configuration).orElse(null), + SslConfigSupport.sslTrustStoreType(configNamespace).get(configuration), + SslConfigSupport.sslIgnoreHostnameVerification(configNamespace).get(configuration) + ); + } +} diff --git a/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/ssl/SSLConfig.java b/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/ssl/SSLConfig.java new file mode 100644 index 000000000..700ea908a --- /dev/null +++ b/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/ssl/SSLConfig.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) The original authors + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.jikkou.http.client.ssl; + +/** + * SSL Configs. + * + * @param keyStoreLocation + * @param keyStorePassword + * @param keyStoreType + * @param keyPassword + * @param trustStoreLocation + * @param trustStorePassword + * @param trustStoreType + */ +public record SSLConfig( + String keyStoreLocation, + String keyStorePassword, + String keyStoreType, + String keyPassword, + String trustStoreLocation, + String trustStorePassword, + String trustStoreType, + boolean ignoreHostnameVerification +) { +} \ No newline at end of file diff --git a/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/ssl/SSLContextFactory.java b/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/ssl/SSLContextFactory.java new file mode 100644 index 000000000..0094e02c3 --- /dev/null +++ b/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/ssl/SSLContextFactory.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) The original authors + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.jikkou.http.client.ssl; + +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Default class for retrieving a new {@link SSLContext} instance. + */ +public class SSLContextFactory { + + private static final Logger LOG = LoggerFactory.getLogger(SSLContextFactory.class); + + private static final String PROTOCOL_TLS = "TLS"; + + /** + * Gets the {@link SSLContext} instance using the specified configuration. + * + * @return a new {@link SSLContext} instance. + */ + public SSLContext getSSLContext(final KeyManager[] keyManagers, + final TrustManager[] trustManagers) { + try { + SSLContext sslContext = SSLContext.getInstance(PROTOCOL_TLS); + sslContext.init(keyManagers, trustManagers, new SecureRandom()); + return sslContext; + } catch (NoSuchAlgorithmException + | KeyManagementException e){ + LOG.error("Could not create SSL context for Client Certificate authentication.", e); + throw new RuntimeException(e); + } + } +} diff --git a/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/ssl/SSLUtils.java b/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/ssl/SSLUtils.java new file mode 100644 index 000000000..71c6ed51e --- /dev/null +++ b/extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/ssl/SSLUtils.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) The original authors + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.jikkou.http.client.ssl; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; + +/** + * Utility class for SSL. + */ +public final class SSLUtils { + + + public static TrustManager[] createTrustManagers(final String trustStoreLocation, + final char[] trustStorePassword, + final String trustStoreType, + final String defaultAlgorithm) + throws CertificateException, + NoSuchAlgorithmException, + KeyStoreException, + IOException { + KeyStore trustStore = null; + if (trustStoreLocation != null) + trustStore = createKeyStore(trustStoreLocation, trustStorePassword, trustStoreType); + + final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(defaultAlgorithm); + // Get default from cacerts if trustStore is null. + trustManagerFactory.init(trustStore); + return trustManagerFactory.getTrustManagers(); + } + + public static KeyManager[] createKeyManagers(final String keyStoreLocation, + final char[] keyPassword, + final String keyStoreType, + final String defaultAlgorithm) + throws UnrecoverableKeyException, + NoSuchAlgorithmException, + KeyStoreException, + CertificateException, + IOException { + final KeyStore keyStore = createKeyStore(keyStoreLocation, keyPassword, keyStoreType); + final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(defaultAlgorithm); + keyManagerFactory.init(keyStore, keyPassword); + return keyManagerFactory.getKeyManagers(); + } + + /** + * Helper method to create a new {@link KeyStore}. + * + * @param keyStoreLocation the location of the Keystore file. + * @param keyStorePassword the password of the Keystore. + * @param keyStoreType the type of the Keystore. + * @return a new {@link KeyStore} instance. + * @throws KeyStoreException + * @throws CertificateException + * @throws NoSuchAlgorithmException + * @throws IOException + */ + public static KeyStore createKeyStore(final String keyStoreLocation, + final char[] keyStorePassword, + final String keyStoreType) + throws KeyStoreException, CertificateException, NoSuchAlgorithmException, IOException { + try (InputStream is = Files.newInputStream(Paths.get(keyStoreLocation))) { + KeyStore ks = KeyStore.getInstance(keyStoreType); + ks.load(is, keyStorePassword); + return ks; + } + } +} diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AuthMethod.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AuthMethod.java index eb4ad9bc1..b977feb73 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AuthMethod.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AuthMethod.java @@ -6,19 +6,9 @@ */ package io.streamthoughts.jikkou.schema.registry.api; -import java.util.Arrays; -import java.util.Locale; -import org.jetbrains.annotations.NotNull; - public enum AuthMethod { INVALID, BASICAUTH, - NONE; - - public static AuthMethod getForNameIgnoreCase(final @NotNull String str) { - return Arrays.stream(AuthMethod.values()) - .filter(e -> e.name().equals(str.toUpperCase(Locale.ROOT))) - .findFirst() - .orElse(AuthMethod.INVALID); - } + NONE, + SSL; } diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryApiFactory.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryApiFactory.java index 4e2ee04ac..e4a1c003d 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryApiFactory.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryApiFactory.java @@ -41,6 +41,10 @@ public static SchemaRegistryApi create(SchemaRegistryClientConfig config) { builder.header("Authorization", buildAuthorizationHeader); yield builder; } + case SSL -> { + builder.sslConfig(config.getSslConfig()); + yield builder; + } case NONE -> builder; case INVALID -> throw new IllegalStateException("Unexpected value: " + config.getAuthMethod()); diff --git a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryClientConfig.java b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryClientConfig.java index 4a958917b..5e8ae3403 100644 --- a/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryClientConfig.java +++ b/providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryClientConfig.java @@ -6,40 +6,42 @@ */ package io.streamthoughts.jikkou.schema.registry.api; +import io.streamthoughts.jikkou.common.utils.Enums; import io.streamthoughts.jikkou.core.config.ConfigProperty; import io.streamthoughts.jikkou.core.config.Configuration; +import io.streamthoughts.jikkou.http.client.SslConfigSupport; +import io.streamthoughts.jikkou.http.client.ssl.SSLConfig; public class SchemaRegistryClientConfig { - public static final String SCHEMA_REGISTRY_CONFIG_PREFIX = "schemaRegistry"; + public static final String CONFIG_NAMESPACE = "schemaRegistry"; public static final ConfigProperty SCHEMA_REGISTRY_URL = ConfigProperty - .ofString(SCHEMA_REGISTRY_CONFIG_PREFIX + ".url") - .description("Comma-separated list of URLs for schema registry instances that can be used to register or look up schemas."); + .ofString(CONFIG_NAMESPACE + ".url") + .description("Comma-separated list of URLs for schema registry instances that can be used to register or look up schemas."); public static final ConfigProperty SCHEMA_REGISTRY_VENDOR_NAME = ConfigProperty - .ofString(SCHEMA_REGISTRY_CONFIG_PREFIX + ".vendor") - .orElse("generic") - .description("The name of the schema registry implementation vendor."); + .ofString(CONFIG_NAMESPACE + ".vendor") + .orElse("generic") + .description("The name of the schema registry implementation vendor."); public static final ConfigProperty SCHEMA_REGISTRY_AUTH_METHOD = ConfigProperty - .ofString(SCHEMA_REGISTRY_CONFIG_PREFIX + ".authMethod") - .orElse(AuthMethod.NONE.name()) - .description("Method to use for authenticating on Schema Registry. Available values are: [none, basicauth]"); + .ofString(CONFIG_NAMESPACE + ".authMethod") + .orElse(AuthMethod.NONE.name()) + .description("Method to use for authenticating on Schema Registry. Available values are: [none, basicauth, ssl]"); public static final ConfigProperty SCHEMA_REGISTRY_BASIC_AUTH_USERNAME = ConfigProperty - .ofString(SCHEMA_REGISTRY_CONFIG_PREFIX + ".basicAuthUser") - .description("Use when 'schemaRegistry.authMethod' is 'basicauth' to specify the username for Authorization Basic header"); + .ofString(CONFIG_NAMESPACE + ".basicAuthUser") + .description("Use when 'schemaRegistry.authMethod' is 'basicauth' to specify the username for Authorization Basic header"); public static final ConfigProperty SCHEMA_REGISTRY_BASIC_AUTH_PASSWORD = ConfigProperty - .ofString(SCHEMA_REGISTRY_CONFIG_PREFIX + ".basicAuthPassword") - .description("Use when 'schemaRegistry.authMethod' is 'basicauth' to specify the password for Authorization Basic header"); + .ofString(CONFIG_NAMESPACE + ".basicAuthPassword") + .description("Use when 'schemaRegistry.authMethod' is 'basicauth' to specify the password for Authorization Basic header"); public static final ConfigProperty SCHEMA_REGISTRY_DEBUG_LOGGING_ENABLED = ConfigProperty - .ofBoolean(SCHEMA_REGISTRY_CONFIG_PREFIX + ".debugLoggingEnabled") - .description("Enable debug logging.") - .orElse(false); - + .ofBoolean(CONFIG_NAMESPACE + ".debugLoggingEnabled") + .description("Enable debug logging.") + .orElse(false); private final Configuration configuration; @@ -61,7 +63,7 @@ public String getSchemaRegistryVendor() { } public AuthMethod getAuthMethod() { - return AuthMethod.getForNameIgnoreCase(SCHEMA_REGISTRY_AUTH_METHOD.get(configuration)); + return Enums.getForNameIgnoreCase(SCHEMA_REGISTRY_AUTH_METHOD.get(configuration), AuthMethod.class, AuthMethod.INVALID); } public String getBasicAuthUsername() { @@ -79,4 +81,8 @@ public String getBasicAuthInfo() { public boolean getDebugLoggingEnabled() { return SCHEMA_REGISTRY_DEBUG_LOGGING_ENABLED.get(configuration); } + + public SSLConfig getSslConfig() { + return SslConfigSupport.getSslConfig(CONFIG_NAMESPACE, configuration); + } }