From 046b02f4ca6f1915aee7eb961d1948439b2f7ee7 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 28 Apr 2015 00:29:53 -0700 Subject: [PATCH] KAFKA-1690. new java producer needs ssl support as a client. --- checkstyle/checkstyle.xml | 1 - checkstyle/import-control.xml | 14 +- .../java/org/apache/kafka/clients/ClientUtils.java | 14 + .../apache/kafka/clients/CommonClientConfigs.java | 2 + .../kafka/clients/consumer/ConsumerConfig.java | 11 +- .../kafka/clients/consumer/KafkaConsumer.java | 5 +- .../kafka/clients/producer/KafkaProducer.java | 5 +- .../kafka/clients/producer/ProducerConfig.java | 8 +- .../apache/kafka/common/config/SecurityConfig.java | 113 ++++++ .../apache/kafka/common/network/Authenticator.java | 60 +++ .../org/apache/kafka/common/network/Channel.java | 133 +++++++ .../kafka/common/network/DefaultAuthenticator.java | 46 +++ .../common/network/PlainTextTransportLayer.java | 148 +++++++ .../apache/kafka/common/network/SSLFactory.java | 188 +++++++++ .../kafka/common/network/SSLTransportLayer.java | 430 +++++++++++++++++++++ .../apache/kafka/common/network/Selectable.java | 1 + .../org/apache/kafka/common/network/Selector.java | 129 +++++-- .../kafka/common/network/TransportLayer.java | 86 +++++ .../kafka/common/protocol/SecurityProtocol.java | 2 + .../java/org/apache/kafka/common/utils/Utils.java | 11 + .../apache/kafka/common/network/EchoServer.java | 108 ++++++ .../kafka/common/network/SSLSelectorTest.java | 97 +++++ .../apache/kafka/common/network/SelectorTest.java | 79 +--- .../org/apache/kafka/common/utils/UtilsTest.java | 2 + .../java/org/apache/kafka/test/TestSSLUtils.java | 208 ++++++++++ 25 files changed, 1780 insertions(+), 121 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/config/SecurityConfig.java create mode 100644 clients/src/main/java/org/apache/kafka/common/network/Authenticator.java create mode 100644 clients/src/main/java/org/apache/kafka/common/network/Channel.java create mode 100644 clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java create mode 100644 clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java create mode 100644 clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java create mode 100644 clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java create mode 100644 clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java create mode 100644 clients/src/test/java/org/apache/kafka/common/network/EchoServer.java create mode 100644 clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java create mode 100644 clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index a215ff3..5fbf562 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -33,7 +33,6 @@ - diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index f2e6cec..e649189 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -26,6 +26,9 @@ + + + @@ -33,14 +36,18 @@ - + + + + + @@ -51,6 +58,9 @@ + + + @@ -73,6 +83,7 @@ + @@ -80,6 +91,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index 0d68bf1..54a554f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -13,17 +13,22 @@ package org.apache.kafka.clients; import java.io.Closeable; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SecurityConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; +import static org.apache.kafka.common.utils.Utils.loadProps; + public class ClientUtils { private static final Logger log = LoggerFactory.getLogger(ClientUtils.class); @@ -61,4 +66,13 @@ public class ClientUtils { } } } + + public static SecurityConfig parseSecurityConfig(String securityConfigFile) throws IOException { + Properties securityProps = new Properties(); + if (securityConfigFile == null || securityConfigFile == "") { + return new SecurityConfig(securityProps); + } + securityProps = loadProps(securityConfigFile); + return new SecurityConfig(securityProps); + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index cf32e4e..0b23875 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -55,4 +55,6 @@ public class CommonClientConfigs { public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; + public static final String SECURITY_CONFIG_FILE_CONFIG = "security.config.file"; + public static final String SECURITY_CONFIG_FILE_DOC = "Kafka client security related config file."; } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 42c7219..bc5e8a1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -156,6 +156,10 @@ public class ConsumerConfig extends AbstractConfig { public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"; private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the Deserializer interface."; + /** security.config.file */ + public static final String SECURITY_CONFIG_FILE_CONFIG = CommonClientConfigs.SECURITY_CONFIG_FILE_CONFIG; + private static final String SECURITY_CONFIG_FILE_DOC = CommonClientConfigs.SECURITY_CONFIG_FILE_DOC; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, @@ -277,7 +281,12 @@ public class ConsumerConfig extends AbstractConfig { .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, - VALUE_DESERIALIZER_CLASS_DOC); + VALUE_DESERIALIZER_CLASS_DOC) + .define(SECURITY_CONFIG_FILE_CONFIG, + Type.STRING, + "", + Importance.MEDIUM, + SECURITY_CONFIG_FILE_DOC); } public static Map addDeserializerToConfig(Map configs, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 09ecb42..e03b9c0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.SecurityConfig; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.MetricName; @@ -361,6 +362,7 @@ public class KafkaConsumer implements Consumer { private final boolean autoCommit; private final long autoCommitIntervalMs; private final ConsumerRebalanceCallback rebalanceCallback; + private final SecurityConfig securityConfig; private long lastCommitAttemptMs; private boolean closed = false; @@ -472,7 +474,8 @@ public class KafkaConsumer implements Consumer { String metricGrpPrefix = "consumer"; Map metricsTags = new LinkedHashMap(); metricsTags.put("client-id", clientId); - this.client = new NetworkClient(new Selector(metrics, time, metricGrpPrefix, metricsTags), + this.securityConfig = ClientUtils.parseSecurityConfig(config.getString(ConsumerConfig.SECURITY_CONFIG_FILE_CONFIG)); + this.client = new NetworkClient(new Selector(metrics, time, metricGrpPrefix, metricsTags, securityConfig), this.metadata, clientId, 100, // a fixed large enough value will suffice diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 42b1292..e924e9f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SecurityConfig; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.RecordTooLargeException; @@ -138,6 +139,7 @@ public class KafkaProducer implements Producer { private final Serializer keySerializer; private final Serializer valueSerializer; private final ProducerConfig producerConfig; + private final SecurityConfig securityConfig; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -214,6 +216,7 @@ public class KafkaProducer implements Producer { this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + this.securityConfig = ClientUtils.parseSecurityConfig(config.getString(ProducerConfig.SECURITY_CONFIG_FILE_CONFIG)); Map metricTags = new LinkedHashMap(); metricTags.put("client-id", clientId); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), @@ -228,7 +231,7 @@ public class KafkaProducer implements Producer { List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); - NetworkClient client = new NetworkClient(new Selector(this.metrics, time, "producer", metricTags), + NetworkClient client = new NetworkClient(new Selector(this.metrics, time, "producer", metricTags, securityConfig), this.metadata, clientId, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 5a57555..342fc9f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -169,6 +169,11 @@ public class ProducerConfig extends AbstractConfig { public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the Serializer interface."; + /** security.config.file */ + public static final String SECURITY_CONFIG_FILE_CONFIG = CommonClientConfigs.SECURITY_CONFIG_FILE_CONFIG; + private static final String SECURITY_CONFIG_FILE_DOC = CommonClientConfigs.SECURITY_CONFIG_FILE_DOC; + + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) @@ -217,7 +222,8 @@ public class ProducerConfig extends AbstractConfig { Importance.LOW, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) - .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC); + .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) + .define(SECURITY_CONFIG_FILE_CONFIG, Type.STRING, "", Importance.MEDIUM, SECURITY_CONFIG_FILE_DOC); } public static Map addSerializerToConfig(Map configs, diff --git a/clients/src/main/java/org/apache/kafka/common/config/SecurityConfig.java b/clients/src/main/java/org/apache/kafka/common/config/SecurityConfig.java new file mode 100644 index 0000000..7954a7e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/config/SecurityConfig.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.config; + + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.protocol.SecurityProtocol; + + +/** + * Security Related config for clients and server. + */ + +public class SecurityConfig extends AbstractConfig { + /* + * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS THESE ARE PART OF THE PUBLIC API AND + * CHANGE WILL BREAK USER CODE. + */ + + private static final ConfigDef CONFIG; + + public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol"; + public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported."; + + public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol"; + public static final String SSL_PROTOCOL_DOC = "The TLS protocol used for broker connections if security protocol is SSL. " + + "Any version of TLS is accepted by default."; + + public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites"; + public static final String SSL_CIPHER_SUITES_DOC = "The list of cipher suites enabled for SSL connections. " + + "Default value is the list of cipher suites enabled for the Java Virtual Machine."; + + public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols"; + public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. " + + "Default value is the list of protocols enabled for the Java Virtual Machine."; + + + public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type"; + public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. " + + "Default value is the default key store format of the Java Virtual Machine."; + + public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location"; + public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file. " + + "This is optional for Client and can be used for two-way authentication for client."; + + public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password"; + public static final String SSL_KEYSTORE_PASSWORD_DOC = "The store password for the key store file. "; + + + public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password"; + public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file. " + + "This is optional for client."; + + public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type"; + public static final String SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store file. " + + "Default value is JKS."; + + public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location"; + public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file. "; + + public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password"; + public static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the trust store file. "; + + public static final String SSL_CLIENT_REQUIRE_CERT_CONFIG = "ssl.client.require.cert"; + public static final String SSL_CLIENT_REQUIRE_CERT_DOC = "This is to enforce two-way authentication between client and server." + + "Default value is false. If set to true client need to prover Keystrore releated config"; + + public static final String SSL_KEYMANAGER_ALGORITHM_CONFIG = "ssl.keymanager.algorithm"; + public static final String SSL_KEYMANAGER_ALGORITHM_DOC = "The algorithm used by key manager factory for SSL connections. " + + "Default value is the key manager factory algorithm configured for the Java Virtual Machine."; + + public static final String SSL_TRUSTMANAGER_ALGORITHM_CONFIG = "ssl.trustmanager.algorithm"; + public static final String SSL_TRUSTMANAGER_ALGORITHM_DOC = "The algorithm used by trust manager factory for SSL connections. " + + "Default value is the trust manager factory algorithm configured for the Java Virtual Machine."; + + + static { + CONFIG = new ConfigDef().define(SECURITY_PROTOCOL_CONFIG, Type.STRING, SecurityProtocol.PLAINTEXT.toString(), Importance.MEDIUM, SECURITY_PROTOCOL_DOC) + .define(SSL_PROTOCOL_CONFIG, Type.STRING, "TLS", Importance.MEDIUM, SSL_PROTOCOL_DOC) + .define(SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.MEDIUM, SSL_CIPHER_SUITES_DOC, false) + .define(SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, "TLSv1.2, TLSv1.1, TLSv1", Importance.MEDIUM, SSL_ENABLED_PROTOCOLS_DOC) + .define(SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, Importance.MEDIUM, SSL_KEYSTORE_TYPE_DOC, false) + .define(SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.MEDIUM, SSL_KEYSTORE_LOCATION_DOC, false) + .define(SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.MEDIUM, SSL_KEYSTORE_PASSWORD_DOC, false) + .define(SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.MEDIUM, SSL_KEY_PASSWORD_DOC, false) + .define(SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, Importance.MEDIUM, SSL_TRUSTSTORE_TYPE_DOC, false) + .define(SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.MEDIUM, SSL_TRUSTSTORE_LOCATION_DOC, false) + .define(SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.MEDIUM, SSL_TRUSTSTORE_PASSWORD_DOC, false) + .define(SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, Importance.MEDIUM, SSL_KEYMANAGER_ALGORITHM_DOC, false) + .define(SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, Importance.MEDIUM, SSL_TRUSTMANAGER_ALGORITHM_DOC, false) + .define(SSL_CLIENT_REQUIRE_CERT_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, SSL_CLIENT_REQUIRE_CERT_DOC); + } + + public SecurityConfig(Map props) { + super(CONFIG, props); + } + + +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java new file mode 100644 index 0000000..ee8516f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.network; + +/* + * Authentication for Channel + */ + +import java.io.IOException; +import com.sun.security.auth.UserPrincipal; + + +public interface Authenticator { + + /** + * Closes this channel + * + * @throws IOException if any I/O error occurs + */ + void close() throws IOException; + + /** + * + * @throws IOException + */ + void init() throws IOException; + + /** + * Returns UserPrincipal after authentication is established + */ + UserPrincipal userPrincipal(); + + + /** + * Does authentication and returns SelectionKey.OP if further communication needed + */ + int authenticate(boolean read, boolean write) throws IOException; + + /** + * returns true if authentication is complete otherwise returns false; + */ + + boolean isComplete(); + +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/Channel.java b/clients/src/main/java/org/apache/kafka/common/network/Channel.java new file mode 100644 index 0000000..3526ba3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/Channel.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.network; + + +import java.io.IOException; +import java.io.DataInputStream; +import java.io.DataOutputStream; + +import java.nio.ByteBuffer; +import java.nio.channels.ScatteringByteChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.SocketChannel; + +import com.sun.security.auth.UserPrincipal; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ + +public class Channel implements ScatteringByteChannel, GatheringByteChannel { + private static final Logger log = LoggerFactory.getLogger(Channel.class); + private TransportLayer transportLayer; + private Authenticator authenticator; + + + public Channel(TransportLayer transportLayer, Authenticator authenticator) throws IOException { + this.transportLayer = transportLayer; + this.authenticator = authenticator; + this.authenticator.init(); + } + + public void close() throws IOException { + transportLayer.close(); + authenticator.close(); + } + + /** + * returns user principal for the session + * Incase of PLAINTEXT and No Authentication returns ANONYMOUS as the userPrincipal + * If SSL used without any SASL Authentication returns SSLSession.peerPrincipal + */ + public UserPrincipal userPrincipal() { + return authenticator.userPrincipal(); + } + + public int connect(boolean read, boolean write) throws IOException { + if (transportLayer.isReady() && authenticator.isComplete()) + return 0; + int status = 0; + if (!transportLayer.isReady()) + status = transportLayer.handshake(read, write); + if (status == 0 && !authenticator.isComplete()) + status = authenticator.authenticate(read, write); + return status; + } + + + public boolean isOpen() { + return transportLayer.isOpen(); + } + + public SocketChannel socketChannel() { + return transportLayer.socketChannel(); + } + + /** + * Writes a sequence of bytes to this channel from the given buffer. + */ + @Override + public int write(ByteBuffer src) throws IOException { + return transportLayer.write(src); + } + + @Override + public long write(ByteBuffer[] srcs) throws IOException { + return transportLayer.write(srcs); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + return transportLayer.write(srcs, offset, length); + } + + @Override + public int read(ByteBuffer dst) throws IOException { + return transportLayer.read(dst); + } + + @Override + public long read(ByteBuffer[] dsts) throws IOException { + return transportLayer.read(dsts); + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + return transportLayer.read(dsts, offset, length); + } + + public boolean finishConnect() throws IOException { + return transportLayer.finishConnect(); + } + + public boolean isReady() { + return transportLayer.isReady() && authenticator.isComplete(); + } + + public DataInputStream getInputStream() throws IOException { + return transportLayer.inStream(); + } + + public DataOutputStream getOutputStream() throws IOException { + return transportLayer.outStream(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java new file mode 100644 index 0000000..c1ec794 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.network; + +import com.sun.security.auth.UserPrincipal; +import java.io.IOException; + +public class DefaultAuthenticator implements Authenticator { + + TransportLayer transportLayer; + + public DefaultAuthenticator(TransportLayer transportLayer) { + this.transportLayer = transportLayer; + } + + public void init() {} + + public int authenticate(boolean read, boolean write) throws IOException { + return 0; + } + + public UserPrincipal userPrincipal() { + return new UserPrincipal(transportLayer.getPeerPrincipal().toString()); + } + + public void close() throws IOException {} + + public boolean isComplete() { + return true; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java new file mode 100644 index 0000000..11cd80c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.network; + +/* + * Transport layer for PLAINTEXT communication + */ + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import java.io.DataInputStream; +import java.io.DataOutputStream; + +import java.security.Principal; +import com.sun.security.auth.UserPrincipal; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PlainTextTransportLayer implements TransportLayer { + private static final Logger log = LoggerFactory.getLogger(PlainTextTransportLayer.class); + SocketChannel socketChannel = null; + DataInputStream inStream = null; + DataOutputStream outStream = null; + + public PlainTextTransportLayer(SocketChannel socketChannel) throws IOException { + this.socketChannel = socketChannel; + + } + + + /** + * Closes this channel + * + * @throws IOException If and I/O error occurs + */ + public void close() throws IOException { + socketChannel.socket().close(); + socketChannel.close(); + } + + /** + * Flushes the buffer to the network, non blocking + * @param buf ByteBuffer + * @return boolean true if the buffer has been emptied out, false otherwise + * @throws IOException + */ + public boolean flush(ByteBuffer buf) throws IOException { + int remaining = buf.remaining(); + if (remaining > 0) { + int written = socketChannel.write(buf); + return written >= remaining; + } + return true; + } + + /** + * Tells wheter or not this channel is open. + */ + public boolean isOpen() { + return socketChannel.isOpen(); + } + + /** + * Writes a sequence of bytes to this channel from the given buffer. + */ + public int write(ByteBuffer src) throws IOException { + return socketChannel.write(src); + } + + public long write(ByteBuffer[] srcs) throws IOException { + return socketChannel.write(srcs); + } + + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + return socketChannel.write(srcs, offset, length); + } + + public int read(ByteBuffer dst) throws IOException { + return socketChannel.read(dst); + } + + public long read(ByteBuffer[] dsts) throws IOException { + return socketChannel.read(dsts); + } + + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + return socketChannel.read(dsts, offset, length); + } + + public boolean isReady() { + return true; + } + + public SocketChannel socketChannel() { + return socketChannel; + } + + public boolean finishConnect() throws IOException { + return socketChannel.finishConnect(); + } + + /** + * Performs SSL handshake hence is a no-op for the non-secure + * implementation + * @param read Unused in non-secure implementation + * @param write Unused in non-secure implementation + * @return Always return 0 + * @throws IOException + */ + public int handshake(boolean read, boolean write) throws IOException { + return 0; + } + + public DataInputStream inStream() throws IOException { + if (inStream == null) + this.inStream = new DataInputStream(socketChannel.socket().getInputStream()); + return inStream; + } + + public DataOutputStream outStream() throws IOException { + if (outStream == null) + this.outStream = new DataOutputStream(socketChannel.socket().getOutputStream()); + return outStream; + } + + public Principal getPeerPrincipal() { + return new UserPrincipal("ANONYMOUS"); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java b/clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java new file mode 100644 index 0000000..9cf9051 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.io.FileInputStream; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.KeyStore; + +import javax.net.ssl.*; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SecurityConfig; + + +public class SSLFactory { + + public enum Mode { CLIENT, SERVER }; + private String protocol; + private String provider; + private String kmfAlgorithm; + private String tmfAlgorithm; + private SecurityStore keystore = null; + private String keyPassword; + private SecurityStore truststore; + private String[] cipherSuites; + private String[] enabledProtocols; + private SSLContext sslContext; + private boolean requireClientCert; + private Mode mode; + + + public SSLFactory(Mode mode) { + this.mode = mode; + } + + + public void init(SecurityConfig securityConfig) throws IOException, GeneralSecurityException { + this.protocol = securityConfig.getString(SecurityConfig.SSL_PROTOCOL_CONFIG); + if (securityConfig.getList(SecurityConfig.SSL_CIPHER_SUITES_CONFIG) != null) + this.cipherSuites = (String[]) securityConfig.getList(SecurityConfig.SSL_CIPHER_SUITES_CONFIG).toArray(); + if (securityConfig.getList(SecurityConfig.SSL_ENABLED_PROTOCOLS_CONFIG) != null) + this.enabledProtocols = (String[]) securityConfig.getList(SecurityConfig.SSL_ENABLED_PROTOCOLS_CONFIG).toArray(); + this.requireClientCert = securityConfig.getBoolean(SecurityConfig.SSL_CLIENT_REQUIRE_CERT_CONFIG); + this.kmfAlgorithm = securityConfig.getString(SecurityConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG); + this.tmfAlgorithm = securityConfig.getString(SecurityConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG); + if ((mode == Mode.CLIENT && requireClientCert) || (mode == Mode.SERVER)) + createKeystore(securityConfig.getString(SecurityConfig.SSL_KEYSTORE_TYPE_CONFIG), + securityConfig.getString(SecurityConfig.SSL_KEYSTORE_LOCATION_CONFIG), + securityConfig.getString(SecurityConfig.SSL_KEYSTORE_PASSWORD_CONFIG), + securityConfig.getString(SecurityConfig.SSL_KEY_PASSWORD_CONFIG)); + createTruststore(securityConfig.getString(SecurityConfig.SSL_TRUSTSTORE_TYPE_CONFIG), + securityConfig.getString(SecurityConfig.SSL_TRUSTSTORE_LOCATION_CONFIG), + securityConfig.getString(SecurityConfig.SSL_TRUSTSTORE_PASSWORD_CONFIG)); + + this.sslContext = createSSLContext(); + + } + + + private SSLContext createSSLContext() throws GeneralSecurityException, IOException { + SSLContext sslContext; + if (provider != null) + sslContext = SSLContext.getInstance(protocol, provider); + else + sslContext = SSLContext.getInstance(protocol); + + KeyManager[] keyManagers = null; + if (keystore != null) { + String kmfAlgorithm = this.kmfAlgorithm != null ? this.kmfAlgorithm : KeyManagerFactory.getDefaultAlgorithm(); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmfAlgorithm); + KeyStore ks = keystore.load(); + String keyPassword = this.keyPassword != null ? this.keyPassword : keystore.password; + kmf.init(ks, keyPassword.toCharArray()); + keyManagers = kmf.getKeyManagers(); + } + + String tmfAlgorithm = this.tmfAlgorithm != null ? this.tmfAlgorithm : TrustManagerFactory.getDefaultAlgorithm(); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(tmfAlgorithm); + KeyStore ts = truststore == null ? null : truststore.load(); + tmf.init(ts); + + sslContext.init(keyManagers, tmf.getTrustManagers(), null); + return sslContext; + } + + public SSLEngine createSSLEngine(String peerHost, int peerPort) { + SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort); + if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites); + if (mode == Mode.SERVER) { + sslEngine.setUseClientMode(false); + } else { + sslEngine.setUseClientMode(true); + sslEngine.setNeedClientAuth(requireClientCert); + } + if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols); + return sslEngine; + } + + /** + * Returns a configured SSLServerSocketFactory. + * + * @return the configured SSLSocketFactory. + * @throws GeneralSecurityException thrown if the SSLSocketFactory could not + * be initialized. + * @throws IOException thrown if and IO error occurred while loading + * the server keystore. + */ + public SSLServerSocketFactory createSSLServerSocketFactory() throws GeneralSecurityException, IOException { + if (mode != Mode.SERVER) { + throw new IllegalStateException("Factory is in CLIENT mode"); + } + return sslContext.getServerSocketFactory(); + } + + /** + * Returns if client certificates are required or not. + * + * @return if client certificates are required or not. + */ + public boolean isClientCertRequired() { + return requireClientCert; + } + + + private void createKeystore(String type, String path, String password, String keyPassword) { + if (path == null && password != null) { + throw new KafkaException("SSL key store password is not specified."); + } else if (path != null && password == null) { + throw new KafkaException("SSL key store is not specified, but key store password is specified."); + } else if (path != null && password != null) { + this.keystore = new SecurityStore(type, path, password); + this.keyPassword = keyPassword; + } + } + + private void createTruststore(String type, String path, String password) { + if (path == null && password != null) { + throw new KafkaException("SSL key store password is not specified."); + } else if (path != null && password == null) { + throw new KafkaException("SSL key store is not specified, but key store password is specified."); + } else if (path != null && password != null) { + this.truststore = new SecurityStore(type, path, password); + } + } + + + private class SecurityStore { + private final String type; + private final String path; + private final String password; + + private SecurityStore(String type, String path, String password) { + this.type = type == null ? KeyStore.getDefaultType() : type; + this.path = path; + this.password = password; + } + + private KeyStore load() throws GeneralSecurityException, IOException { + FileInputStream in = null; + try { + KeyStore ks = KeyStore.getInstance(type); + in = new FileInputStream(path); + ks.load(in, password.toCharArray()); + return ks; + } finally { + if (in != null) in.close(); + } + } + } + + public void close() { + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java new file mode 100644 index 0000000..dc84975 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java @@ -0,0 +1,430 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.network; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.nio.channels.SelectionKey; + +import java.security.Principal; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLEngineResult.HandshakeStatus; +import javax.net.ssl.SSLEngineResult.Status; + +import java.io.DataInputStream; +import java.io.DataOutputStream; + +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * Transport layer for SSL communication + */ + +public class SSLTransportLayer implements TransportLayer { + private static final Logger log = LoggerFactory.getLogger(SSLTransportLayer.class); + SocketChannel socketChannel; + SSLEngine sslEngine; + HandshakeStatus handshakeStatus = null; + SSLEngineResult handshakeResult = null; + boolean handshakeComplete = false; + boolean closed = false; + boolean closing = false; + ByteBuffer netInBuffer = null; + ByteBuffer netOutBuffer = null; + ByteBuffer appReadBuffer = null; + ByteBuffer appWriteBuffer = null; + ByteBuffer emptyBuf = ByteBuffer.allocate(0); + DataInputStream inStream = null; + DataOutputStream outStream = null; + + + public SSLTransportLayer(SocketChannel socketChannel, SSLEngine sslEngine) throws IOException { + this.socketChannel = socketChannel; + this.sslEngine = sslEngine; + this.netInBuffer = ByteBuffer.allocateDirect(sslEngine.getSession().getPacketBufferSize()); + this.netOutBuffer = ByteBuffer.allocateDirect(sslEngine.getSession().getPacketBufferSize()); + this.appWriteBuffer = ByteBuffer.allocateDirect(sslEngine.getSession().getApplicationBufferSize()); + this.appReadBuffer = ByteBuffer.allocateDirect(sslEngine.getSession().getApplicationBufferSize()); + startHandshake(); + } + + public void startHandshake() throws IOException { + netOutBuffer.position(0); + netOutBuffer.limit(0); + netInBuffer.position(0); + netInBuffer.limit(0); + handshakeComplete = false; + closed = false; + closing = false; + //initiate handshake + sslEngine.beginHandshake(); + handshakeStatus = sslEngine.getHandshakeStatus(); + } + + public SocketChannel socketChannel() { + return socketChannel; + } + + public boolean finishConnect() throws IOException { + return socketChannel.finishConnect(); + } + + /** + * Flushes the buffer to the network, non blocking + * @param buf ByteBuffer + * @return boolean true if the buffer has been emptied out, false otherwise + * @throws IOException + */ + public boolean flush(ByteBuffer buf) throws IOException { + int remaining = buf.remaining(); + if (remaining > 0) { + int written = socketChannel.write(buf); + return written >= remaining; + } + return true; + } + + /** + * Performs SSL handshake, non blocking. + * The return for this operation is 0 if the handshake is complete and a positive value if it is not complete. + * In the event of a positive value coming back, re-register the selection key for the return values interestOps. + * @param read boolean - true if the underlying channel is readable + * @param write boolean - true if the underlying channel is writable + * @return int - 0 if hand shake is complete, otherwise it returns a SelectionKey interestOps value + * @throws IOException + */ + public int handshake(boolean read, boolean write) throws IOException { + if (handshakeComplete) return 0; //we have done our initial handshake + + if (!flush(netOutBuffer)) return SelectionKey.OP_WRITE; + + switch(handshakeStatus) { + case NOT_HANDSHAKING: + // SSLEnginge.getHandshakeStatus is transient and it doesn't record FINISHED status properly + if (handshakeResult.getHandshakeStatus() == HandshakeStatus.FINISHED) { + handshakeComplete = !netOutBuffer.hasRemaining(); + if (handshakeComplete) + return 0; + else + return SelectionKey.OP_WRITE; + } else { + //should never happen + throw new IOException("NOT_HANDSHAKING during handshake"); + } + case FINISHED: + //we are complete if we have delivered the last package + handshakeComplete = !netOutBuffer.hasRemaining(); + //return 0 if we are complete, otherwise we still have data to write + if (handshakeComplete) return 0; + else return SelectionKey.OP_WRITE; + case NEED_WRAP: + handshakeResult = handshakeWrap(write); + if (handshakeResult.getStatus() == Status.OK) { + if (handshakeStatus == HandshakeStatus.NEED_TASK) + handshakeStatus = tasks(); + } else { + //wrap should always work with our buffers + throw new IOException("Unexpected status [" + handshakeResult.getStatus() + "] during handshake WRAP."); + } + if (handshakeStatus != HandshakeStatus.NEED_UNWRAP || (!flush(netOutBuffer))) + return SelectionKey.OP_WRITE; + //fall down to NEED_UNWRAP on the same call, will result in a + //BUFFER_UNDERFLOW if it needs data + case NEED_UNWRAP: + handshakeResult = handshakeUnwrap(read); + if (handshakeResult.getStatus() == Status.OK) { + if (handshakeStatus == HandshakeStatus.NEED_TASK) + handshakeStatus = tasks(); + } else if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) { + return SelectionKey.OP_READ; + } else { + throw new IOException(String.format("Unexpected status [%s] during handshake UNWRAP", handshakeStatus)); + } + break; + case NEED_TASK: + handshakeStatus = tasks(); + break; + default: + throw new IllegalStateException(String.format("Unexpected status [%s]", handshakeStatus)); + } + //return 0 if we are complete, otherwise re-register for any activity that + //would cause this method to be called again. + if (handshakeComplete) return 0; + else return SelectionKey.OP_WRITE | SelectionKey.OP_READ; + } + + /** + * Executes all the tasks needed on the same thread. + * @return HandshakeStatus + */ + private HandshakeStatus tasks() { + Runnable r = null; + while ((r = sslEngine.getDelegatedTask()) != null) r.run(); + return sslEngine.getHandshakeStatus(); + } + + /** + * Performs the WRAP function + * @param doWrite boolean + * @return SSLEngineResult + * @throws IOException + */ + private SSLEngineResult handshakeWrap(Boolean doWrite) throws IOException { + //this should never be called with a network buffer that contains data + //so we can clear it here. + netOutBuffer.clear(); + SSLEngineResult result = sslEngine.wrap(appWriteBuffer, netOutBuffer); + //prepare the results to be written + netOutBuffer.flip(); + handshakeStatus = result.getHandshakeStatus(); + //optimization, if we do have a writable channel, write it now + if (doWrite) flush(netOutBuffer); + return result; + } + + /** + * Perform handshake unwrap + * @param doRead boolean + * @return SSLEngineResult + * @throws IOException + */ + private SSLEngineResult handshakeUnwrap(Boolean doRead) throws IOException { + if (netInBuffer.position() == netInBuffer.limit()) { + //clear the buffer if we have emptied it out on data + netInBuffer.clear(); + } + + if (doRead) { + int read = socketChannel.read(netInBuffer); + if (read == -1) throw new IOException("EOF during handshake."); + } + + SSLEngineResult result; + boolean cont = false; + do { + //prepare the buffer with the incoming data + netInBuffer.flip(); + result = sslEngine.unwrap(netInBuffer, appWriteBuffer); + netInBuffer.compact(); + handshakeStatus = result.getHandshakeStatus(); + if (result.getStatus() == SSLEngineResult.Status.OK && + result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { + handshakeStatus = tasks(); + } + cont = result.getStatus() == SSLEngineResult.Status.OK && + handshakeStatus == HandshakeStatus.NEED_UNWRAP; + } while(cont); + return result; + } + + + public int getOutboundRemaining() { + return netOutBuffer.remaining(); + } + + /** + * Sends a SSL close message, will not physically close the connection here.
+ * @throws IOException if an I/O error occurs + * @throws IOException if there is data on the outgoing network buffer and we are unable to flush it + */ + public void close() throws IOException { + if (closing) return; + closing = true; + sslEngine.closeOutbound(); + + if (!flush(netOutBuffer)) { + throw new IOException("Remaining data in the network buffer, can't send SSL close message, force a close with close(true) instead"); + } + //prep the buffer for the close message + netOutBuffer.clear(); + //perform the close, since we called sslEngine.closeOutbound + SSLEngineResult handshake = sslEngine.wrap(emptyBuf, netOutBuffer); + //we should be in a close state + if (handshake.getStatus() != SSLEngineResult.Status.CLOSED) { + throw new IOException("Invalid close state, will not send network data."); + } + netOutBuffer.flip(); + flush(netOutBuffer); + socketChannel.socket().close(); + socketChannel.close(); + closed = !netOutBuffer.hasRemaining() && (handshake.getHandshakeStatus() != HandshakeStatus.NEED_WRAP); + } + + public boolean isOpen() { + return socketChannel.isOpen(); + } + + public boolean isReady() { + return handshakeComplete; + } + + /** + * Reads a sequence of bytes from this channel into the given buffer. + * + * @param dst The buffer into which bytes are to be transferred + * @return The number of bytes read, possible zero or -1 if the channel has reached end-of-stream + * @throws IOException if some other I/O error occurs + * @throws IllegalStateException if the destination buffer is different than appBufHandler.getReadBuffer() + */ + public int read(ByteBuffer dst) throws IOException { + if (closing || closed) return -1; + if (!handshakeComplete) throw new IllegalStateException("Handshake incomplete."); + netInBuffer = Utils.ensureCapacity(netInBuffer, packetBufferSize()); + int netread = socketChannel.read(netInBuffer); + if (netread == -1) return -1; + int read = 0; + SSLEngineResult unwrap = null; + + do { + netInBuffer.flip(); + unwrap = sslEngine.unwrap(netInBuffer, appReadBuffer); + //compact the buffer + netInBuffer.compact(); + if (unwrap.getStatus() == Status.OK || unwrap.getStatus() == Status.BUFFER_UNDERFLOW) { + read += unwrap.bytesProduced(); + // perform any task if needed + if (unwrap.getHandshakeStatus() == HandshakeStatus.NEED_TASK) tasks(); + //if we need more network data, than return for now. + if (unwrap.getStatus() == Status.BUFFER_UNDERFLOW) return readFromAppBuffer(dst); + } else if (unwrap.getStatus() == Status.BUFFER_OVERFLOW && read > 0) { + appReadBuffer = Utils.ensureCapacity(appReadBuffer, applicationBufferSize()); + //buffer overflow can happen, if we have read data, then + //empty out the dst buffer before we do another read + return readFromAppBuffer(dst); + } else { + //here we should trap BUFFER_OVERFLOW and call expand on the buffer + // for now, throw an exception, as we initialized the buffers + // in constructor + throw new IOException(String.format("Unable to unwrap data, invalid status [%s]", unwrap.getStatus())); + } + } while(netInBuffer.position() != 0); + return readFromAppBuffer(dst); + } + + public long read(ByteBuffer[] dsts) throws IOException { + return read(dsts, 0, dsts.length); + } + + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + int totalRead = 0; + for (int i = offset; i < length; i++) { + int read = read(dsts[i]); + if (read > 0) { + totalRead += read; + } + } + return totalRead; + } + + + /** + * Writes a sequence of bytes to this channel from the given buffer. + * + * @param src The buffer from which bytes are to be retrieved + * @return The number of bytes written, possibly zero + * @throws IOException If some other I/O error occurs + */ + + public int write(ByteBuffer src) throws IOException { + int written = 0; + if (src == this.netOutBuffer) + written = socketChannel.write(src); + else { + if (closing || closed) throw new IOException("Channel is in closing state"); + if (!flush(netOutBuffer)) + return written; + netOutBuffer.clear(); + SSLEngineResult result = sslEngine.wrap(src, netOutBuffer); + written = result.bytesConsumed(); + netOutBuffer.flip(); + if (result.getStatus() == Status.OK) { + if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) + tasks(); + } else { + throw new IOException(String.format("Unable to wrap data, invalid status %s", result.getStatus())); + } + flush(netOutBuffer); + } + return written; + } + + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + int totalWritten = 0; + for (int i = offset; i < length; i++) { + if (srcs[i].hasRemaining()) { + int written = write(srcs[i]); + if (written > 0) { + totalWritten += written; + } + } + } + return totalWritten; + } + + public long write(ByteBuffer[] srcs) throws IOException { + return write(srcs, 0, srcs.length); + } + + public DataInputStream inStream() throws IOException { + if (inStream == null) + this.inStream = new DataInputStream(socketChannel.socket().getInputStream()); + return inStream; + } + + public DataOutputStream outStream() throws IOException { + if (outStream == null) + this.outStream = new DataOutputStream(socketChannel.socket().getOutputStream()); + return outStream; + } + + public Principal getPeerPrincipal() { + //return sslEngine.getSession().getPeerPrincipal(); + return null; + } + + private int readFromAppBuffer(ByteBuffer dst) { + appReadBuffer.flip(); + try { + int remaining = appReadBuffer.remaining(); + if (remaining > 0) { + if (remaining > dst.remaining()) + remaining = dst.remaining(); + int i = 0; + while (i < remaining) { + dst.put(appReadBuffer.get()); + i++; + } + } + return remaining; + } finally { + appReadBuffer.compact(); + } + } + + private int packetBufferSize() { + return sslEngine.getSession().getPacketBufferSize(); + } + + private int applicationBufferSize() { + return sslEngine.getSession().getApplicationBufferSize(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java index b5f8d83..4a0c2bd 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.common.network; + import java.io.IOException; import java.net.InetSocketAddress; import java.util.List; diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 57de058..0068143 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SecurityConfig; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.MetricName; @@ -40,6 +41,7 @@ import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Count; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,25 +53,25 @@ import org.slf4j.LoggerFactory; * responses. *

* A connection can be added to the selector associated with an integer id by doing - * + * *

  * selector.connect(42, new InetSocketAddress("google.com", server.port), 64000, 64000);
  * 
- * + * * The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating * the connection. The successful invocation of this method does not mean a valid connection has been established. - * + * * Sending requests, receiving responses, processing connection completions, and disconnections on the existing * connections are all done using the poll() call. - * + * *
  * List<NetworkRequest> requestsToSend = Arrays.asList(new NetworkRequest(0, myBytes), new NetworkRequest(1, myOtherBytes));
  * selector.poll(TIMEOUT_MS, requestsToSend);
  * 
- * + * * The selector maintains several lists that are reset by each call to poll() which are available via * various getters. These are reset by each call to poll(). - * + * * This class is not thread safe! */ public class Selector implements Selectable { @@ -78,6 +80,7 @@ public class Selector implements Selectable { private final java.nio.channels.Selector selector; private final Map keys; + private final Map channels; private final List completedSends; private final List completedReceives; private final List disconnected; @@ -87,11 +90,14 @@ public class Selector implements Selectable { private final SelectorMetrics sensors; private final String metricGrpPrefix; private final Map metricTags; + private final SecurityConfig securityConfig; + private final SecurityProtocol securityProtocol; + private SSLFactory sslFactory = null; /** * Create a new selector */ - public Selector(Metrics metrics, Time time, String metricGrpPrefix, Map metricTags) { + public Selector(Metrics metrics, Time time, String metricGrpPrefix, Map metricTags, SecurityConfig securityConfig) { try { this.selector = java.nio.channels.Selector.open(); } catch (IOException e) { @@ -101,12 +107,24 @@ public class Selector implements Selectable { this.metricGrpPrefix = metricGrpPrefix; this.metricTags = metricTags; this.keys = new HashMap(); + this.channels = new HashMap(); this.completedSends = new ArrayList(); this.completedReceives = new ArrayList(); this.connected = new ArrayList(); this.disconnected = new ArrayList(); this.failedSends = new ArrayList(); this.sensors = new SelectorMetrics(metrics); + this.securityConfig = securityConfig; + this.securityProtocol = SecurityProtocol.valueOf(securityConfig.getString(SecurityConfig.SECURITY_PROTOCOL_CONFIG)); + try { + if (securityProtocol == SecurityProtocol.SSL) { + this.sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT); + this.sslFactory.init(this.securityConfig); + } + } catch (Exception e) { + throw new KafkaException(e); + } + } /** @@ -127,25 +145,37 @@ public class Selector implements Selectable { if (this.keys.containsKey(id)) throw new IllegalStateException("There is already a connection for id " + id); - SocketChannel channel = SocketChannel.open(); - channel.configureBlocking(false); - Socket socket = channel.socket(); + SocketChannel socketChannel = SocketChannel.open(); + socketChannel.configureBlocking(false); + Socket socket = socketChannel.socket(); socket.setKeepAlive(true); socket.setSendBufferSize(sendBufferSize); socket.setReceiveBufferSize(receiveBufferSize); socket.setTcpNoDelay(true); try { - channel.connect(address); + socketChannel.connect(address); } catch (UnresolvedAddressException e) { - channel.close(); + socketChannel.close(); throw new IOException("Can't resolve address: " + address, e); } catch (IOException e) { - channel.close(); + socketChannel.close(); throw e; } - SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT); + + TransportLayer transportLayer; + if (securityProtocol == SecurityProtocol.SSL) { + transportLayer = new SSLTransportLayer(socketChannel, + sslFactory.createSSLEngine(socket.getInetAddress().getHostName(), + socket.getPort())); + } else { + transportLayer = new PlainTextTransportLayer(socketChannel); + } + Authenticator authenticator = new DefaultAuthenticator(transportLayer); + Channel channel = new Channel(transportLayer, authenticator); + SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT); key.attach(new Transmissions(id)); this.keys.put(id, key); + this.channels.put(key, channel); } /** @@ -202,12 +232,12 @@ public class Selector implements Selectable { /** * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing * disconnections, initiating new sends, or making progress on in-progress sends or receives. - * + * * When this call is completed the user can check for completed sends, receives, connections or disconnects using * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These * lists will be cleared at the beginning of each {@link #poll(long, List)} call and repopulated by the call if any * completed I/O. - * + * * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely. * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is * already an in-progress send @@ -230,7 +260,7 @@ public class Selector implements Selectable { iter.remove(); Transmissions transmissions = transmissions(key); - SocketChannel channel = channel(key); + Channel channel = channel(key); // register all per-broker metrics at once sensors.maybeRegisterNodeMetrics(transmissions.id); @@ -242,29 +272,46 @@ public class Selector implements Selectable { key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); this.connected.add(transmissions.id); this.sensors.connectionCreated.record(); + } /* read from any connections that have readable data */ if (key.isReadable()) { - if (!transmissions.hasReceive()) - transmissions.receive = new NetworkReceive(transmissions.id); - transmissions.receive.readFrom(channel); - if (transmissions.receive.complete()) { - transmissions.receive.payload().rewind(); - this.completedReceives.add(transmissions.receive); - this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit()); - transmissions.clearReceive(); + if (!channel.isReady()) { + int status = channel.connect(key.isReadable(), key.isWritable()); + if (status == 0) + key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); + else + key.interestOps(status); + } else { + if (!transmissions.hasReceive()) + transmissions.receive = new NetworkReceive(transmissions.id); + transmissions.receive.readFrom(channel); + if (transmissions.receive.complete()) { + transmissions.receive.payload().rewind(); + this.completedReceives.add(transmissions.receive); + this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit()); + transmissions.clearReceive(); + } } } /* write to any sockets that have space in their buffer and for which we have data */ if (key.isWritable()) { - transmissions.send.writeTo(channel); - if (transmissions.send.remaining() <= 0) { - this.completedSends.add(transmissions.send); - this.sensors.recordBytesSent(transmissions.id, transmissions.send.size()); - transmissions.clearSend(); - key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); + if (!channel.isReady()) { + int status = channel.connect(key.isReadable(), key.isWritable()); + if (status == 0) + key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); + else + key.interestOps(status); + } else { + transmissions.send.writeTo(channel); + if (transmissions.send.remaining() <= 0) { + this.completedSends.add(transmissions.send); + this.sensors.recordBytesSent(transmissions.id, transmissions.send.size()); + transmissions.clearSend(); + key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); + } } } @@ -287,9 +334,9 @@ public class Selector implements Selectable { long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); } - - private String socketDescription(SocketChannel channel) { - Socket socket = channel.socket(); + + private String socketDescription(Channel channel) { + Socket socket = channel.socketChannel().socket(); if (socket == null) return "[unconnected socket]"; else if (socket.getInetAddress() != null) @@ -362,7 +409,7 @@ public class Selector implements Selectable { /** * Check for data, waiting up to the given timeout. - * + * * @param ms Length of time to wait, in milliseconds. If negative, wait indefinitely. * @return The number of keys ready * @throws IOException @@ -380,7 +427,8 @@ public class Selector implements Selectable { * Begin closing this connection */ private void close(SelectionKey key) { - SocketChannel channel = channel(key); + Channel channel = channel(key); + this.channels.remove(key); Transmissions trans = transmissions(key); if (trans != null) { this.keys.remove(trans.id); @@ -390,7 +438,6 @@ public class Selector implements Selectable { key.attach(null); key.cancel(); try { - channel.socket().close(); channel.close(); } catch (IOException e) { log.error("Exception closing connection to node {}:", trans.id, e); @@ -418,8 +465,8 @@ public class Selector implements Selectable { /** * Get the socket channel associated with this selection key */ - private SocketChannel channel(SelectionKey key) { - return (SocketChannel) key.channel(); + private Channel channel(SelectionKey key) { + return this.channels.get(key); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java new file mode 100644 index 0000000..6ce013b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.network; + +/* + * Transport layer for underlying communication + */ +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import java.io.DataInputStream; +import java.io.DataOutputStream; + +import java.security.Principal; + + +public interface TransportLayer { + + /** + * Closes this channel + * + * @throws IOException If and I/O error occurs + */ + void close() throws IOException; + + + /** + * Tells wheather or not this channel is open. + */ + boolean isOpen(); + + /** + * Writes a sequence of bytes to this channel from the given buffer. + */ + int write(ByteBuffer src) throws IOException; + + long write(ByteBuffer[] srcs) throws IOException; + + long write(ByteBuffer[] srcs, int offset, int length) throws IOException; + + int read(ByteBuffer dst) throws IOException; + + long read(ByteBuffer[] dsts) throws IOException; + + long read(ByteBuffer[] dsts, int offset, int length) throws IOException; + + boolean isReady(); + + boolean finishConnect() throws IOException; + + SocketChannel socketChannel(); + + /** + * Performs SSL handshake hence is a no-op for the non-secure + * implementation + * @param read Unused in non-secure implementation + * @param write Unused in non-secure implementation + * @return Always return 0 + * @throws IOException + */ + int handshake(boolean read, boolean write) throws IOException; + + DataInputStream inStream() throws IOException; + + DataOutputStream outStream() throws IOException; + + boolean flush(ByteBuffer buffer) throws IOException; + + Principal getPeerPrincipal(); +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java index d3394ee..93cabd1 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java @@ -24,6 +24,8 @@ import java.util.Map; public enum SecurityProtocol { /** Un-authenticated, non-encrypted channel */ PLAINTEXT(0, "PLAINTEXT"), + /** SSL channe */ + SSL(1, "PLAINTEXT"), /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */ TRACE(Short.MAX_VALUE, "TRACE"); diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index f73eedb..9382060 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -479,4 +479,15 @@ public class Utils { public static String readFileAsString(String path) throws IOException { return Utils.readFileAsString(path, Charset.defaultCharset()); } + + public static ByteBuffer ensureCapacity(ByteBuffer existingBuffer, int newLength) { + if (newLength > existingBuffer.capacity()) { + ByteBuffer newBuffer = ByteBuffer.allocate(newLength); + existingBuffer.flip(); + newBuffer.put(existingBuffer); + return newBuffer; + } + return existingBuffer; + } + } diff --git a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java new file mode 100644 index 0000000..47dda69 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.network; + +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.protocol.SecurityProtocol; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * A simple server that takes size delimited byte arrays and just echos them back to the sender. + */ +class EchoServer extends Thread { + public final int port; + private final ServerSocket serverSocket; + private final List threads; + private final List sockets; + private SecurityProtocol protocol; + private SSLFactory sslFactory; + private final AtomicBoolean startHandshake = new AtomicBoolean(); + + public EchoServer(SecurityConfig securityConfig) throws Exception { + this.protocol = SecurityProtocol.valueOf(securityConfig.getString(SecurityConfig.SECURITY_PROTOCOL_CONFIG)); + if (protocol == SecurityProtocol.SSL) { + this.sslFactory = new SSLFactory(SSLFactory.Mode.SERVER); + this.sslFactory.init(securityConfig); + this.serverSocket = sslFactory.createSSLServerSocketFactory().createServerSocket(0); + this.startHandshake.set(true); + } else { + this.serverSocket = new ServerSocket(0); + } + this.port = this.serverSocket.getLocalPort(); + this.threads = Collections.synchronizedList(new ArrayList()); + this.sockets = Collections.synchronizedList(new ArrayList()); + } + + + @Override + public void run() { + try { + while (true) { + final Socket socket = serverSocket.accept(); + sockets.add(socket); + Thread thread = new Thread() { + @Override + public void run() { + try { + DataInputStream input = new DataInputStream(socket.getInputStream()); + DataOutputStream output = new DataOutputStream(socket.getOutputStream()); + while (socket.isConnected() && !socket.isClosed()) { + int size = input.readInt(); + byte[] bytes = new byte[size]; + input.readFully(bytes); + output.writeInt(size); + output.write(bytes); + output.flush(); + } + } catch (IOException e) { + // ignore + } finally { + try { + socket.close(); + } catch (IOException e) { + // ignore + } + } + } + }; + thread.start(); + threads.add(thread); + } + } catch (IOException e) { + // ignore + } + } + + public void closeConnections() throws IOException { + for (Socket socket : sockets) + socket.close(); + } + + public void close() throws IOException, InterruptedException { + this.serverSocket.close(); + closeConnections(); + for (Thread t : threads) + t.join(); + join(); + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java new file mode 100644 index 0000000..f3c6153 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.network; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; + +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.test.TestSSLUtils; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses. + */ + +public class SSLSelectorTest { + + private static final int BUFFER_SIZE = 4 * 1024; + + private EchoServer server; + private Selectable selector; + + @Before + public void setup() throws Exception { + SecurityConfig serverSecurityConfig = TestSSLUtils.createSSLConfigFile(SSLFactory.Mode.SERVER, null); + this.server = new EchoServer(serverSecurityConfig); + this.server.start(); + String trustStoreServer = serverSecurityConfig.getString(SecurityConfig.SSL_TRUSTSTORE_LOCATION_CONFIG); + SecurityConfig clientSecurityConfig = TestSSLUtils.createSSLConfigFile(SSLFactory.Mode.CLIENT, trustStoreServer); + this.selector = new Selector(new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap(), clientSecurityConfig); + } + + @After + public void teardown() throws Exception { + this.selector.close(); + this.server.close(); + } + + + /** + * Validate that we can send and receive a message larger than the receive and send buffer size + */ + @Test + public void testSendLargeRequest() throws Exception { + int node = 0; + blockingConnect(node); + String big = TestUtils.randomString(10 * BUFFER_SIZE); + assertEquals(big, blockingRequest(node, big)); + } + + private String blockingRequest(int node, String s) throws IOException { + selector.send(createSend(node, s)); + selector.poll(1000L); + while (true) { + selector.poll(1000L); + for (NetworkReceive receive : selector.completedReceives()) + if (receive.source() == node) + return asString(receive); + } + } + + private String asString(NetworkReceive receive) { + return new String(Utils.toArray(receive.payload())); + } + + private NetworkSend createSend(int node, String s) { + return new NetworkSend(node, ByteBuffer.wrap(s.getBytes())); + } + + /* connect and wait for the connection to complete */ + private void blockingConnect(int node) throws IOException { + selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); + while (!selector.connected().contains(node)) + selector.poll(10000L); + } + +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index d5b306b..e4100d3 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -15,18 +15,14 @@ package org.apache.kafka.common.network; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; -import java.net.Socket; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedHashMap; -import java.util.List; +import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.common.config.SecurityConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; @@ -47,9 +43,10 @@ public class SelectorTest { @Before public void setup() throws Exception { - this.server = new EchoServer(); + SecurityConfig securityConfig = ClientUtils.parseSecurityConfig(""); + this.server = new EchoServer(securityConfig); this.server.start(); - this.selector = new Selector(new Metrics(), new MockTime() , "MetricGroup", new LinkedHashMap()); + this.selector = new Selector(new Metrics(), new MockTime() , "MetricGroup", new LinkedHashMap(), securityConfig); } @After @@ -264,71 +261,5 @@ public class SelectorTest { return new String(Utils.toArray(receive.payload())); } - /** - * A simple server that takes size delimited byte arrays and just echos them back to the sender. - */ - static class EchoServer extends Thread { - public final int port; - private final ServerSocket serverSocket; - private final List threads; - private final List sockets; - - public EchoServer() throws Exception { - this.serverSocket = new ServerSocket(0); - this.port = this.serverSocket.getLocalPort(); - this.threads = Collections.synchronizedList(new ArrayList()); - this.sockets = Collections.synchronizedList(new ArrayList()); - } - - public void run() { - try { - while (true) { - final Socket socket = serverSocket.accept(); - sockets.add(socket); - Thread thread = new Thread() { - public void run() { - try { - DataInputStream input = new DataInputStream(socket.getInputStream()); - DataOutputStream output = new DataOutputStream(socket.getOutputStream()); - while (socket.isConnected() && !socket.isClosed()) { - int size = input.readInt(); - byte[] bytes = new byte[size]; - input.readFully(bytes); - output.writeInt(size); - output.write(bytes); - output.flush(); - } - } catch (IOException e) { - // ignore - } finally { - try { - socket.close(); - } catch (IOException e) { - // ignore - } - } - } - }; - thread.start(); - threads.add(thread); - } - } catch (IOException e) { - // ignore - } - } - - public void closeConnections() throws IOException { - for (Socket socket : sockets) - socket.close(); - } - - public void close() throws IOException, InterruptedException { - this.serverSocket.close(); - closeConnections(); - for (Thread t : threads) - t.join(); - join(); - } - } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index 2ebe3c2..bf2b5bd 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -100,4 +100,6 @@ public class UtilsTest { buffer = ByteBuffer.wrap(myvar).asReadOnlyBuffer(); this.subTest(buffer); } + + } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java new file mode 100644 index 0000000..c811096 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.test; + +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.network.SSLFactory; +import sun.security.x509.AlgorithmId; +import sun.security.x509.CertificateAlgorithmId; +import sun.security.x509.CertificateIssuerName; +import sun.security.x509.CertificateSerialNumber; +import sun.security.x509.CertificateSubjectName; +import sun.security.x509.CertificateValidity; +import sun.security.x509.CertificateVersion; +import sun.security.x509.CertificateX509Key; +import sun.security.x509.X500Name; +import sun.security.x509.X509CertImpl; +import sun.security.x509.X509CertInfo; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.math.BigInteger; +import java.security.GeneralSecurityException; +import java.security.Key; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.KeyStore; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.security.SecureRandom; +import java.security.cert.Certificate; +import java.security.cert.X509Certificate; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + + +public class TestSSLUtils { + + /** + * Create a self-signed X.509 Certificate. + * From http://bfo.com/blog/2011/03/08/odds_and_ends_creating_a_new_x_509_certificate.html. + * + * @param dn the X.509 Distinguished Name, eg "CN=Test, L=London, C=GB" + * @param pair the KeyPair + * @param days how many days from now the Certificate is valid for + * @param algorithm the signing algorithm, eg "SHA1withRSA" + * @return the self-signed certificate + * @throws IOException thrown if an IO error ocurred. + * @throws GeneralSecurityException thrown if an Security error ocurred. + */ + public static X509Certificate generateCertificate(String dn, KeyPair pair, + int days, String algorithm) throws GeneralSecurityException, IOException { + PrivateKey privkey = pair.getPrivate(); + X509CertInfo info = new X509CertInfo(); + Date from = new Date(); + Date to = new Date(from.getTime() + days * 86400000L); + CertificateValidity interval = new CertificateValidity(from, to); + BigInteger sn = new BigInteger(64, new SecureRandom()); + X500Name owner = new X500Name(dn); + + info.set(X509CertInfo.VALIDITY, interval); + info.set(X509CertInfo.SERIAL_NUMBER, new CertificateSerialNumber(sn)); + info.set(X509CertInfo.SUBJECT, new CertificateSubjectName(owner)); + info.set(X509CertInfo.ISSUER, new CertificateIssuerName(owner)); + info.set(X509CertInfo.KEY, new CertificateX509Key(pair.getPublic())); + info + .set(X509CertInfo.VERSION, new CertificateVersion(CertificateVersion.V3)); + AlgorithmId algo = new AlgorithmId(AlgorithmId.md5WithRSAEncryption_oid); + info.set(X509CertInfo.ALGORITHM_ID, new CertificateAlgorithmId(algo)); + + // Sign the cert to identify the algorithm that's used. + X509CertImpl cert = new X509CertImpl(info); + cert.sign(privkey, algorithm); + + // Update the algorith, and resign. + algo = (AlgorithmId) cert.get(X509CertImpl.SIG_ALG); + info + .set(CertificateAlgorithmId.NAME + "." + CertificateAlgorithmId.ALGORITHM, + algo); + cert = new X509CertImpl(info); + cert.sign(privkey, algorithm); + return cert; + } + + public static KeyPair generateKeyPair(String algorithm) throws NoSuchAlgorithmException { + KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm); + keyGen.initialize(1024); + return keyGen.genKeyPair(); + } + + private static KeyStore createEmptyKeyStore() throws GeneralSecurityException, IOException { + KeyStore ks = KeyStore.getInstance("JKS"); + ks.load(null, null); // initialize + return ks; + } + + private static void saveKeyStore(KeyStore ks, String filename, + String password) throws GeneralSecurityException, IOException { + FileOutputStream out = new FileOutputStream(filename); + try { + ks.store(out, password.toCharArray()); + } finally { + out.close(); + } + } + + public static void createKeyStore(String filename, + String password, String alias, + Key privateKey, Certificate cert) throws GeneralSecurityException, IOException { + KeyStore ks = createEmptyKeyStore(); + ks.setKeyEntry(alias, privateKey, password.toCharArray(), + new Certificate[]{cert}); + saveKeyStore(ks, filename, password); + } + + /** + * Creates a keystore with a single key and saves it to a file. + * + * @param filename String file to save + * @param password String store password to set on keystore + * @param keyPassword String key password to set on key + * @param alias String alias to use for the key + * @param privateKey Key to save in keystore + * @param cert Certificate to use as certificate chain associated to key + * @throws GeneralSecurityException for any error with the security APIs + * @throws IOException if there is an I/O error saving the file + */ + public static void createKeyStore(String filename, + String password, String keyPassword, String alias, + Key privateKey, Certificate cert) throws GeneralSecurityException, IOException { + KeyStore ks = createEmptyKeyStore(); + ks.setKeyEntry(alias, privateKey, keyPassword.toCharArray(), + new Certificate[]{cert}); + saveKeyStore(ks, filename, password); + } + + public static void createTrustStore(String filename, + String password, String alias, + Certificate cert) throws GeneralSecurityException, IOException { + KeyStore ks = createEmptyKeyStore(); + ks.setCertificateEntry(alias, cert); + saveKeyStore(ks, filename, password); + } + + public static void createTrustStore( + String filename, String password, Map certs) throws GeneralSecurityException, IOException { + KeyStore ks = createEmptyKeyStore(); + for (Map.Entry cert : certs.entrySet()) { + ks.setCertificateEntry(cert.getKey(), cert.getValue()); + } + saveKeyStore(ks, filename, password); + } + + public static SecurityConfig createSSLConfigFile(SSLFactory.Mode mode, String trustStoreFileClient) throws IOException, GeneralSecurityException { + Properties securityConfigProps = new Properties(); + Map certs = new HashMap(); + KeyPair keyPair = generateKeyPair("RSA"); + X509Certificate cert = generateCertificate("CN=localhost, O=localhost", keyPair, 30, "SHA1withRSA"); + String password = "test"; + + if (mode == SSLFactory.Mode.SERVER) { + File keyStoreFile = File.createTempFile("keystore", ".jks"); + createKeyStore(keyStoreFile.getPath(), password, password, "localhost", keyPair.getPrivate(), cert); + certs.put("localhost", cert); + securityConfigProps.put(SecurityConfig.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath()); + securityConfigProps.put(SecurityConfig.SSL_KEYSTORE_TYPE_CONFIG, "JKS"); + securityConfigProps.put(SecurityConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG, "SunX509"); + securityConfigProps.put(SecurityConfig.SSL_KEYSTORE_PASSWORD_CONFIG, password); + securityConfigProps.put(SecurityConfig.SSL_KEY_PASSWORD_CONFIG, password); + + File trustStoreFile = File.createTempFile("truststore", ".jks"); + createTrustStore(trustStoreFile.getPath(), password, certs); + + securityConfigProps.put(SecurityConfig.SECURITY_PROTOCOL_CONFIG, "SSL"); + securityConfigProps.put(SecurityConfig.SSL_CLIENT_REQUIRE_CERT_CONFIG, "false"); + securityConfigProps.put(SecurityConfig.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath()); + securityConfigProps.put(SecurityConfig.SSL_TRUSTSTORE_PASSWORD_CONFIG, password); + securityConfigProps.put(SecurityConfig.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS"); + } else { + securityConfigProps.put(SecurityConfig.SECURITY_PROTOCOL_CONFIG, "SSL"); + securityConfigProps.put(SecurityConfig.SSL_CLIENT_REQUIRE_CERT_CONFIG, "false"); + securityConfigProps.put(SecurityConfig.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFileClient); + securityConfigProps.put(SecurityConfig.SSL_TRUSTSTORE_PASSWORD_CONFIG, password); + securityConfigProps.put(SecurityConfig.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS"); + } + + securityConfigProps.put(SecurityConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, "SunX509"); + return new SecurityConfig(securityConfigProps); + } + +} -- 2.3.2 (Apple Git-55)