diff --git a/src/main/java/io/confluent/connect/hdfs/DataWriter.java b/src/main/java/io/confluent/connect/hdfs/DataWriter.java index 26ee8bb06..15e4f8dcb 100644 --- a/src/main/java/io/confluent/connect/hdfs/DataWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/DataWriter.java @@ -238,15 +238,15 @@ public void close() throws IOException { private void configureKerberosAuthentication(Configuration hadoopConfiguration) { SecurityUtil.setAuthenticationMethod( - UserGroupInformation.AuthenticationMethod.KERBEROS, - hadoopConfiguration + UserGroupInformation.AuthenticationMethod.KERBEROS, + hadoopConfiguration ); if (connectorConfig.connectHdfsPrincipal() == null - || connectorConfig.connectHdfsKeytab() == null) { + || connectorConfig.connectHdfsKeytab() == null) { throw new ConfigException( - "Hadoop is using Kerberos for authentication, you need to provide both a connect " - + "principal and the path to the keytab of the principal."); + "Hadoop is using Kerberos for authentication, you need to provide both a connect " + + "principal and the path to the keytab of the principal."); } hadoopConfiguration.set("hadoop.security.authentication", "kerberos"); @@ -256,8 +256,8 @@ private void configureKerberosAuthentication(Configuration hadoopConfiguration) String hostname = InetAddress.getLocalHost().getCanonicalHostName(); String namenodePrincipal = SecurityUtil.getServerPrincipal( - connectorConfig.hdfsNamenodePrincipal(), - hostname + connectorConfig.hdfsNamenodePrincipal(), + hostname ); // namenode principal is needed for multi-node hadoop cluster @@ -265,13 +265,13 @@ private void configureKerberosAuthentication(Configuration hadoopConfiguration) hadoopConfiguration.set("dfs.namenode.kerberos.principal", namenodePrincipal); } log.info("Hadoop namenode principal: {}", - hadoopConfiguration.get("dfs.namenode.kerberos.principal")); + hadoopConfiguration.get("dfs.namenode.kerberos.principal")); UserGroupInformation.setConfiguration(hadoopConfiguration); // replace the _HOST specified in the principal config to the actual host String principal = SecurityUtil.getServerPrincipal( - connectorConfig.connectHdfsPrincipal(), - hostname + connectorConfig.connectHdfsPrincipal(), + hostname ); UserGroupInformation.loginUserFromKeytab(principal, connectorConfig.connectHdfsKeytab()); final UserGroupInformation ugi = UserGroupInformation.getLoginUser(); @@ -281,24 +281,27 @@ private void configureKerberosAuthentication(Configuration hadoopConfiguration) ticketRenewThread = new Thread(() -> renewKerberosTicket(ugi)); } catch (UnknownHostException e) { throw new ConnectException( - String.format( - "Could not resolve local hostname for Kerberos authentication: %s", - e.getMessage() - ), - e + String.format( + "Could not resolve local hostname for Kerberos authentication: %s", + e.getMessage() + ), + e ); } catch (IOException e) { throw new ConnectException( - String.format("Could not authenticate with Kerberos: %s", e.getMessage()), - e + String.format("Could not authenticate with Kerberos: %s", e.getMessage()), + e ); } log.info( - "Starting the Kerberos ticket renew thread with period {} ms.", - connectorConfig.kerberosTicketRenewPeriodMs() + "Starting the Kerberos ticket renew thread with period {} ms.", + connectorConfig.kerberosTicketRenewPeriodMs() ); - ticketRenewThread.start(); + if (connectorConfig.kerberosRefreshTicket()) { + ticketRenewThread.start(); + } + } private void initializeHiveServices(Configuration hadoopConfiguration) { diff --git a/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java b/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java index 88d2dc5fb..8ada988c9 100644 --- a/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java +++ b/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java @@ -170,6 +170,16 @@ public class HdfsSinkConnectorConfig extends StorageSinkConnectorConfig { private static final ParentValueRecommender AVRO_COMPRESSION_RECOMMENDER = new ParentValueRecommender(FORMAT_CLASS_CONFIG, AvroFormat.class, AVRO_SUPPORTED_CODECS); + //Kerberos renew ticket + public static final String KERBEROS_REFRESH_TICKET_CONFIG = "hdfs.kerberos.refresh.ticket"; + private static final String KERBEROS_REFRESH_TICKET_DOC = + "Configuration indicating whether kerberos should refresh ticket or not."; + private static final boolean KERBEROS_REFRESH_TICKET_DEFAULT = true; + private static final String KERBEROS_REFRESH_TICKET_DISPLAY = "Kerberos refresh ticket"; + private static final ConfigDef.Recommender KerberosRenewTicketDependentsRecommender = + new BooleanParentRecommender( + KERBEROS_REFRESH_TICKET_CONFIG); + static { STORAGE_CLASS_RECOMMENDER.addValidValues( Arrays.asList(HdfsStorage.class) @@ -339,6 +349,18 @@ public static ConfigDef newConfigDef() { KERBEROS_TICKET_RENEW_PERIOD_MS_DISPLAY, hdfsAuthenticationKerberosDependentsRecommender ); + + configDef.define( + KERBEROS_REFRESH_TICKET_CONFIG, + Type.BOOLEAN, + KERBEROS_REFRESH_TICKET_DEFAULT, + Importance.LOW, + KERBEROS_REFRESH_TICKET_DOC, + group, + ++orderInGroup, + Width.SHORT, + KERBEROS_REFRESH_TICKET_DISPLAY + ); } // Put the storage group(s) last ... ConfigDef storageConfigDef = StorageSinkConnectorConfig.newConfigDef( @@ -503,6 +525,10 @@ public long kerberosTicketRenewPeriodMs() { return getLong(KERBEROS_TICKET_RENEW_PERIOD_MS_CONFIG); } + public boolean kerberosRefreshTicket() { + return getBoolean(KERBEROS_REFRESH_TICKET_CONFIG); + } + public String logsDir() { return getString(LOGS_DIR_CONFIG); } diff --git a/src/test/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfigTest.java b/src/test/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfigTest.java index 5a7666641..ca6678c55 100644 --- a/src/test/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfigTest.java +++ b/src/test/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfigTest.java @@ -269,6 +269,19 @@ public void testEmptyTimezoneExceptionMessage() { } } + @Test + public void testKerberosRefreshTicketDefault() { + connectorConfig = new HdfsSinkConnectorConfig(properties); + assertEquals(Boolean.TRUE, + connectorConfig.kerberosRefreshTicket()); + } + @Test + public void testKerberosRefreshTicket() { + properties.put(HdfsSinkConnectorConfig.KERBEROS_REFRESH_TICKET_CONFIG, "false"); + connectorConfig = new HdfsSinkConnectorConfig(properties); + assertEquals(Boolean.FALSE, + connectorConfig.kerberosRefreshTicket()); + } @Test public void testRecommendedValues() throws Exception { List expectedStorageClasses = Arrays.asList(HdfsStorage.class);