diff --git kafka-handler/README.md kafka-handler/README.md index 753e3e36b1..da98c98e85 100644 --- kafka-handler/README.md +++ kafka-handler/README.md @@ -213,15 +213,68 @@ GROUP BY ## Table Properties -| Property | Description | Mandatory | Default | -|-------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------------------------| -| kafka.topic | Kafka topic name to map the table to. | Yes | null | -| kafka.bootstrap.servers | Table property indicating Kafka broker(s) connection string. | Yes | null | -| kafka.serde.class | Serializer and Deserializer class implementation. | No | org.apache.hadoop.hive.serde2.JsonSerDe | -| hive.kafka.poll.timeout.ms | Parameter indicating Kafka Consumer poll timeout period in millis. FYI this is independent from internal Kafka consumer timeouts. | No | 5000 (5 Seconds) | -| hive.kafka.max.retries | Number of retries for Kafka metadata fetch operations. | No | 6 | -| hive.kafka.metadata.poll.timeout.ms | Number of milliseconds before consumer timeout on fetching Kafka metadata. | No | 30000 (30 Seconds) | -| kafka.write.semantic | Writer semantics, allowed values (AT_LEAST_ONCE, EXACTLY_ONCE) | No | AT_LEAST_ONCE | +| Property | Description | Mandatory | Default | +|--------------------------------------- |------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------------------------| +| kafka.topic | Kafka topic name to map the table to. | Yes | null | +| kafka.bootstrap.servers | Table property indicating Kafka broker(s) connection string. | Yes | null | +| kafka.serde.class | Serializer and Deserializer class implementation. | No | org.apache.hadoop.hive.serde2.JsonSerDe | +| hive.kafka.poll.timeout.ms | Parameter indicating Kafka Consumer poll timeout period in millis. FYI this is independent from internal Kafka consumer timeouts. | No | 5000 (5 Seconds) | +| hive.kafka.max.retries | Number of retries for Kafka metadata fetch operations. | No | 6 | +| hive.kafka.metadata.poll.timeout.ms | Number of milliseconds before consumer timeout on fetching Kafka metadata. | No | 30000 (30 Seconds) | +| kafka.write.semantic | Writer semantics, allowed values (AT_LEAST_ONCE, EXACTLY_ONCE) | No | AT_LEAST_ONCE | +| hive.kafka.ssl.credential.keystore | Location of credential store that holds SSL credentials. Used to avoid plaintext passwords in table properties | No | | +| hive.kafka.ssl.truststore.password | The key in the credential store used to retrieve the truststore password. This is NOT the password itself. | No | | +| hive.kafka.ssl.keystore.password | The key in the credential store used to retrieve the keystore password. This is NOT the password itself. Used for 2-way auth. | No | | +| hive.kafka.ssl.key.password | The key in the credential store used to retrieve the key password. This is NOT the password itself. | No | | +| hive.kafka.ssl.truststore.location | The location of the SSL truststore. Requires HDFS for queries that require jobs. Kafka requires this to be local, so pull it down. | No | | +| hive.kafka.ssl.keystore.location | The location of the SSL keystore. Requires HDFS for queries that require jobs. Kafka requires this to be local, so pull it down. | No | | + +### SSL +The user can create SSL connections to Kafka, via the properties described in the table properties. +These properties are used to retrieve passwords from a credential store to avoid being in plaintext table properties. +To ensure security, the credential store should have appropriate permissions applied. Clients that query the table without +being able to read the credentials store will have the query fail. + +Normally, the `.ssl.truststore.location` and `.ssl.keystore.location` would have to be local. Any job that requires a +job can retrieve these from an HDFS location, and they will be sourced from HDFS and pulled locally for this purpose. + +The producer and consumer stores are both sourced from the same property, e.g. `hive.kafka.ssl.truststore.location`, rather than `kafka.consumer.ssl.truststore.location`. + +#### SSL Example +Table creation is very simple, simply create a table as normal, and supply the appropriate Kafka +configs (e.g. `kafka.consumer.security.protocol`), along with the credential store configs (e.g. `hive.kafka.ssl.credential.store`). + +``` +CREATE EXTERNAL TABLE + kafka_ssl ( + `data` STRING +) +STORED BY + 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +TBLPROPERTIES ( + "kafka.topic" = "test-topic", + "kafka.bootstrap.servers" = 'localhost:9093', + 'hive.kafka.ssl.credential.keystore'='jceks://hdfs/tmp/test.jceks', + 'hive.kafka.ssl.keystore.password'='keystore.password', + 'hive.kafka.ssl.truststore.password'='truststore.password', + 'kafka.consumer.security.protocol'='SSL', + 'hive.kafka.ssl.keystore.location'='hdfs://cluster/tmp/keystore.jks', + 'hive.kafka.ssl.truststore.location'='hdfs://cluster/tmp/keystore.jks' +); +``` + +Now we can query the table as normal. +``` +SELECT * FROM kafka_ssl LIMIT 10; +``` + +Our truststore and keystore are located in HDFS, which means we can also run more complex queries that result in jobs. +These will still connect from Kafka as expected. +``` +SELECT `data` FROM kafka_ssl where `__offset` > 0 AND `__offset` < 10000 group by `data`; + +``` + ### Setting Extra Consumer/Producer properties. diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java index a4ad01a008..3e308e6716 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java @@ -53,7 +53,43 @@ /** * Table property that indicates if we should commit within the task or delay it to the Metadata Hook Commit call. */ - HIVE_KAFKA_OPTIMISTIC_COMMIT("hive.kafka.optimistic.commit", "false"); + HIVE_KAFKA_OPTIMISTIC_COMMIT("hive.kafka.optimistic.commit", "false"), + + /** + * Table property indicating the location of the credential store containing passwords that would otherwise be + * exposed in Kafka's SSL parameters. + */ + HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE("hive.kafka.ssl.credential.keystore", ""), + + /** + * Table property indicating the key in the credential keystore for the truststore password. This is NOT + * the actual password. + */ + HIVE_KAFKA_SSL_TRUSTSTORE_PASSWORD("hive.kafka.ssl.truststore.password", ""), + + /** + * Table property indicating the key in the credential keystore for the keystore password. This is NOT + * the actual password. Only needed for two way authentication. + */ + HIVE_KAFKA_SSL_KEYSTORE_PASSWORD("hive.kafka.ssl.keystore.password", ""), + + /** + * Table property indicating the key in the credential keystore for the key password. This is NOT + * the actual password. Only needed for two way authentication. + */ + HIVE_KAFKA_SSL_KEY_PASSWORD("hive.kafka.ssl.key.password", ""), + + /** + * Table property indicating the location of the SSL truststore. Kafka cannot normally use an HDFS-based location, + * but we'll pull it down locally for each consumer/producer. + */ + HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG("hive.kafka.ssl.truststore.location", ""), + + /** + * Table property indicating the location of the SSL keystore. Kafka cannot normally use an HDFS-based location, + * but we'll pull it down locally for each consumer/producer. + */ + HIVE_SSL_KEYSTORE_LOCATION_CONFIG("hive.kafka.ssl.keystore.location", ""); /** * Kafka storage handler table properties constructor. diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java index 81252c5936..50b2b4f8a1 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java @@ -35,25 +35,17 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.AuthenticationException; -import org.apache.kafka.common.errors.AuthorizationException; -import org.apache.kafka.common.errors.InvalidTopicException; -import org.apache.kafka.common.errors.OffsetMetadataTooLarge; -import org.apache.kafka.common.errors.SecurityDisabledException; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.errors.*; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.Set; -import java.util.UUID; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.*; import java.util.stream.Collectors; /** @@ -115,11 +107,71 @@ static Properties consumerProperties(Configuration configuration) { if (UserGroupInformation.isSecurityEnabled()) { addKerberosJaasConf(configuration, props); } - // user can always override stuff + + // user can always override stuff, but SSL properties are derived from configuration, because they require local files. + // These need to modified afterwards. This works because these properties use the standard consumer prefix. props.putAll(extractExtraProperties(configuration, CONSUMER_CONFIGURATION_PREFIX)); + setupKafkaSslProperties(configuration, props); + return props; } + static void setupKafkaSslProperties(Configuration configuration, Properties props) { + // Setup SSL via credentials keystore if necessary + final String credKeystore = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName()); + if (!(credKeystore == null) && !credKeystore.isEmpty()) { + final String truststorePasswdConfig = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_TRUSTSTORE_PASSWORD.getName()); + final String keystorePasswdConfig = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_KEYSTORE_PASSWORD.getName()); + final String keyPasswdConfig = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_KEY_PASSWORD.getName()); + + String resourcesDir = HiveConf.getVar(configuration, HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR); + try { + String truststoreLoc = configuration.get(KafkaTableProperties.HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG.getName()); + Path truststorePath = new Path(truststoreLoc); + props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, new File(resourcesDir + "/" + truststorePath.getName()).getAbsolutePath()); + writeStoreToLocal(configuration, truststoreLoc, new File(resourcesDir).getAbsolutePath()); + + final String truststorePasswd = Utilities.getPasswdFromKeystore(credKeystore, truststorePasswdConfig); + props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePasswd); + + // ssl.keystore.password is only needed if two-way authentication is configured. + if(!keystorePasswdConfig.isEmpty()) { + log.info("Kafka keystore configured, configuring local keystore"); + String keystoreLoc = configuration.get(KafkaTableProperties.HIVE_SSL_KEYSTORE_LOCATION_CONFIG.getName()); + Path keystorePath = new Path(keystoreLoc); + props.setProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, new File(resourcesDir + "/" + keystorePath.getName()).getAbsolutePath()); + writeStoreToLocal(configuration, keystoreLoc, new File(resourcesDir).getAbsolutePath()); + + final String keystorePasswd = Utilities.getPasswdFromKeystore(credKeystore, keystorePasswdConfig); + props.setProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePasswd); + } + + // ssl.key.password is optional for clients. + if(!keyPasswdConfig.isEmpty()) { + final String keyPasswd = Utilities.getPasswdFromKeystore(credKeystore, keyPasswdConfig); + props.setProperty(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPasswd); + } + } catch (IOException | URISyntaxException e) { + throw new IllegalStateException("Unable to retrieve password from the credential keystore", e); + } + } + } + + private static void writeStoreToLocal(Configuration configuration, String hdfsLoc, String localDest) throws IOException, URISyntaxException { + if(!"hdfs".equals(new URI(hdfsLoc).getScheme())) { + throw new IllegalArgumentException("Kafka stores must be located in HDFS, but received: " + hdfsLoc); + } + try { + // Make sure the local resources directory is created + new File(localDest).mkdirs(); + URI uri = new URI(hdfsLoc); + FileSystem fs = FileSystem.get(new URI(hdfsLoc), configuration); + fs.copyToLocalFile(new Path(uri.toString()), new Path(localDest)); + } catch (URISyntaxException e) { + throw new IOException("Unable to download store", e); + } + } + private static Map extractExtraProperties(final Configuration configuration, String prefix) { ImmutableMap.Builder builder = ImmutableMap.builder(); final Map kafkaProperties = configuration.getValByRegex("^" + prefix + "\\..*"); @@ -150,6 +202,8 @@ static Properties producerProperties(Configuration configuration) { // user can always override stuff properties.putAll(extractExtraProperties(configuration, PRODUCER_CONFIGURATION_PREFIX)); + setupKafkaSslProperties(configuration, properties); + String taskId = configuration.get("mapred.task.id", null); properties.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG, taskId == null ? "random_" + UUID.randomUUID().toString() : taskId); diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java index 640b24e82d..a19adefc71 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java @@ -64,6 +64,29 @@ public KafkaUtilsTest() { KafkaUtils.consumerProperties(configuration); } + @Test public void testSetupKafkaSslPropertiesNoSslIsUnchanged() { + Configuration config = new Configuration(); + Properties props = new Properties(); + KafkaUtils.setupKafkaSslProperties(config, props); + Assert.assertEquals(new Properties(), props); + } + + @Test(expected = IllegalArgumentException.class) public void testSetupKafkaSslPropertiesSslNotInHdfs() { + Configuration config = new Configuration(); + config.set(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName(), "nonexistentfile"); + config.set(KafkaTableProperties.HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG.getName(), "madeup"); + Properties props = new Properties(); + KafkaUtils.setupKafkaSslProperties(config, props); + } + + @Test(expected = IllegalStateException.class) public void testSetupKafkaSslPropertiesCantRetrieveStore() { + Configuration config = new Configuration(); + config.set(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName(), "nonexistentfile"); + config.set(KafkaTableProperties.HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG.getName(), "hdfs://localhost/tmp/madeup"); + Properties props = new Properties(); + KafkaUtils.setupKafkaSslProperties(config, props); + } + @Test public void testMetadataEnumLookupMapper() { int partition = 1; long offset = 5L;