Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added new property KERBEROS_REFRESH_TICKET for HdfsSinkConnectorConfi… #628

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 23 additions & 20 deletions src/main/java/io/confluent/connect/hdfs/DataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,15 @@ public void close() throws IOException {

private void configureKerberosAuthentication(Configuration hadoopConfiguration) {
SecurityUtil.setAuthenticationMethod(
UserGroupInformation.AuthenticationMethod.KERBEROS,
hadoopConfiguration
UserGroupInformation.AuthenticationMethod.KERBEROS,
hadoopConfiguration
);

if (connectorConfig.connectHdfsPrincipal() == null
|| connectorConfig.connectHdfsKeytab() == null) {
|| connectorConfig.connectHdfsKeytab() == null) {
throw new ConfigException(
"Hadoop is using Kerberos for authentication, you need to provide both a connect "
+ "principal and the path to the keytab of the principal.");
"Hadoop is using Kerberos for authentication, you need to provide both a connect "
+ "principal and the path to the keytab of the principal.");
}

hadoopConfiguration.set("hadoop.security.authentication", "kerberos");
Expand All @@ -256,22 +256,22 @@ private void configureKerberosAuthentication(Configuration hadoopConfiguration)
String hostname = InetAddress.getLocalHost().getCanonicalHostName();

String namenodePrincipal = SecurityUtil.getServerPrincipal(
connectorConfig.hdfsNamenodePrincipal(),
hostname
connectorConfig.hdfsNamenodePrincipal(),
hostname
);

// namenode principal is needed for multi-node hadoop cluster
if (hadoopConfiguration.get("dfs.namenode.kerberos.principal") == null) {
hadoopConfiguration.set("dfs.namenode.kerberos.principal", namenodePrincipal);
}
log.info("Hadoop namenode principal: {}",
hadoopConfiguration.get("dfs.namenode.kerberos.principal"));
hadoopConfiguration.get("dfs.namenode.kerberos.principal"));

UserGroupInformation.setConfiguration(hadoopConfiguration);
// replace the _HOST specified in the principal config to the actual host
String principal = SecurityUtil.getServerPrincipal(
connectorConfig.connectHdfsPrincipal(),
hostname
connectorConfig.connectHdfsPrincipal(),
hostname
);
UserGroupInformation.loginUserFromKeytab(principal, connectorConfig.connectHdfsKeytab());
final UserGroupInformation ugi = UserGroupInformation.getLoginUser();
Expand All @@ -281,24 +281,27 @@ private void configureKerberosAuthentication(Configuration hadoopConfiguration)
ticketRenewThread = new Thread(() -> renewKerberosTicket(ugi));
} catch (UnknownHostException e) {
throw new ConnectException(
String.format(
"Could not resolve local hostname for Kerberos authentication: %s",
e.getMessage()
),
e
String.format(
"Could not resolve local hostname for Kerberos authentication: %s",
e.getMessage()
),
e
);
} catch (IOException e) {
throw new ConnectException(
String.format("Could not authenticate with Kerberos: %s", e.getMessage()),
e
String.format("Could not authenticate with Kerberos: %s", e.getMessage()),
e
);
}

log.info(
"Starting the Kerberos ticket renew thread with period {} ms.",
connectorConfig.kerberosTicketRenewPeriodMs()
"Starting the Kerberos ticket renew thread with period {} ms.",
connectorConfig.kerberosTicketRenewPeriodMs()
);
ticketRenewThread.start();
if (connectorConfig.kerberosRefreshTicket()) {
ticketRenewThread.start();
}

}

private void initializeHiveServices(Configuration hadoopConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ public class HdfsSinkConnectorConfig extends StorageSinkConnectorConfig {
private static final ParentValueRecommender AVRO_COMPRESSION_RECOMMENDER
= new ParentValueRecommender(FORMAT_CLASS_CONFIG, AvroFormat.class, AVRO_SUPPORTED_CODECS);

//Kerberos renew ticket
public static final String KERBEROS_REFRESH_TICKET_CONFIG = "hdfs.kerberos.refresh.ticket";
private static final String KERBEROS_REFRESH_TICKET_DOC =
"Configuration indicating whether kerberos should refresh ticket or not.";
private static final boolean KERBEROS_REFRESH_TICKET_DEFAULT = true;
private static final String KERBEROS_REFRESH_TICKET_DISPLAY = "Kerberos refresh ticket";
private static final ConfigDef.Recommender KerberosRenewTicketDependentsRecommender =
new BooleanParentRecommender(
KERBEROS_REFRESH_TICKET_CONFIG);

static {
STORAGE_CLASS_RECOMMENDER.addValidValues(
Arrays.asList(HdfsStorage.class)
Expand Down Expand Up @@ -339,6 +349,18 @@ public static ConfigDef newConfigDef() {
KERBEROS_TICKET_RENEW_PERIOD_MS_DISPLAY,
hdfsAuthenticationKerberosDependentsRecommender
);

configDef.define(
KERBEROS_REFRESH_TICKET_CONFIG,
Type.BOOLEAN,
KERBEROS_REFRESH_TICKET_DEFAULT,
Importance.LOW,
KERBEROS_REFRESH_TICKET_DOC,
group,
++orderInGroup,
Width.SHORT,
KERBEROS_REFRESH_TICKET_DISPLAY
);
}
// Put the storage group(s) last ...
ConfigDef storageConfigDef = StorageSinkConnectorConfig.newConfigDef(
Expand Down Expand Up @@ -503,6 +525,10 @@ public long kerberosTicketRenewPeriodMs() {
return getLong(KERBEROS_TICKET_RENEW_PERIOD_MS_CONFIG);
}

public boolean kerberosRefreshTicket() {
return getBoolean(KERBEROS_REFRESH_TICKET_CONFIG);
}

public String logsDir() {
return getString(LOGS_DIR_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,19 @@ public void testEmptyTimezoneExceptionMessage() {
}
}

@Test
public void testKerberosRefreshTicketDefault() {
connectorConfig = new HdfsSinkConnectorConfig(properties);
assertEquals(Boolean.TRUE,
connectorConfig.kerberosRefreshTicket());
}
@Test
public void testKerberosRefreshTicket() {
properties.put(HdfsSinkConnectorConfig.KERBEROS_REFRESH_TICKET_CONFIG, "false");
connectorConfig = new HdfsSinkConnectorConfig(properties);
assertEquals(Boolean.FALSE,
connectorConfig.kerberosRefreshTicket());
}
@Test
public void testRecommendedValues() throws Exception {
List<Object> expectedStorageClasses = Arrays.<Object>asList(HdfsStorage.class);
Expand Down