diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 122375c..74539dc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -166,6 +166,10 @@ public class ProducerConfig extends AbstractConfig {
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the Serializer interface.";
+ /** security.protocol */
+ public static final String SECURITY_PROTOCOL = "security.protocol";
+ private static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Currently only PLAINTEXT is supported. SSL and Kerberos are planned for the near future";
+
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -214,7 +218,13 @@ public class ProducerConfig extends AbstractConfig {
Importance.LOW,
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
- .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC);
+ .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
+ .define(SECURITY_PROTOCOL,
+ Type.STRING,
+ "PLAINTEXT",
+ in("PLAINTEXT"),
+ Importance.MEDIUM,
+ SECURITY_PROTOCOL_DOC);
}
ProducerConfig(Map extends Object, ? extends Object> props) {
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java
new file mode 100644
index 0000000..5390993
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+public enum ApiVersion {
+ KAFKA_0820,
+ KAFKA_0830,
+ KAFKA_0900;
+
+ public boolean onOrAfter(ApiVersion other) {
+ return compareTo(other) >= 0;
+ }
+
+ public static ApiVersion parseConfig(String version) {
+ String[] vals = version.split("\\.");
+ StringBuilder parsed = new StringBuilder();
+ parsed.append("KAFKA_");
+
+ for (String v: vals) {
+ parsed.append(v);
+ }
+ return ApiVersion.valueOf(parsed.toString());
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
new file mode 100644
index 0000000..677605c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public enum SecurityProtocol {
+ /** Currently identical to PLAINTEXT and used for testing. Plan is to add instrumentation in the future */
+ TRACE(0, "TRACE"),
+ /** Un-authenticated, non-encrypted channel */
+ PLAINTEXT(1, "PLAINTEXT");
+
+ private static Map codeToSecurityProtocol = new HashMap();
+ private static List names = new ArrayList();
+
+ static {
+ for (SecurityProtocol proto: SecurityProtocol.values()) {
+ codeToSecurityProtocol.put(proto.id, proto);
+ names.add(proto.name);
+ }
+ }
+
+ /** the permenant and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol */
+ public final short id;
+
+ /** a name of the security protocol. This may be used by client configuration */
+ public final String name;
+
+ private SecurityProtocol(int id, String name) {
+ this.id = (short) id;
+ this.name = name;
+ }
+
+ public static String getName(int id) {
+ return codeToSecurityProtocol.get(id).name;
+ }
+
+ public static List getNames() {
+ return names;
+ }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 5d5f52c..6ac7d92 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.protocol.types.Struct;
public class MetadataRequest extends AbstractRequestResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
- private static final String TOPICS_KEY_NAME = "topics";
+ private static String TOPICS_KEY_NAME = "topics";
private final List topics;
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 69530c1..b245890 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -27,7 +27,7 @@ import org.apache.kafka.common.KafkaException;
public class Utils {
- private static final Pattern HOST_PORT_PATTERN = Pattern.compile("\\[?(.+?)\\]?:(\\d+)");
+ private static final Pattern HOST_PORT_PATTERN = Pattern.compile(".*?\\[?([0-9a-z\\-.:]*)\\]?:([0-9]+)");
public static final String NL = System.getProperty("line.separator");
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 4c2ea34..dc69d14 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -31,9 +31,9 @@ public class UtilsTest {
@Test
public void testGetHost() {
assertEquals("127.0.0.1", getHost("127.0.0.1:8000"));
- assertEquals("mydomain.com", getHost("mydomain.com:8080"));
+ assertEquals("mydomain.com", getHost("PLAINTEXT://mydomain.com:8080"));
assertEquals("::1", getHost("[::1]:1234"));
- assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678"));
+ assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678"));
}
@Test
diff --git a/config/server.properties b/config/server.properties
index 1614260..80ee2fc 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -21,8 +21,10 @@ broker.id=0
############################# Socket Server Settings #############################
+listeners=PLAINTEXT://:9092
+
# The port the socket server listens on
-port=9092
+#port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 28b12c7..50f7bdd 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -18,14 +18,15 @@
package kafka.admin
import kafka.common._
-import kafka.cluster.Broker
+import kafka.cluster.SecurityProtocol.SecurityProtocol
+import kafka.cluster.{BrokerEndpoint, Broker, SecurityProtocol}
+
import kafka.log.LogConfig
import kafka.utils.{Logging, ZkUtils, Json}
import kafka.api.{TopicMetadata, PartitionMetadata}
import java.util.Random
import java.util.Properties
-import scala.Some
import scala.Predef._
import scala.collection._
import mutable.ListBuffer
@@ -287,7 +288,9 @@ object AdminUtils extends Logging {
topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo))
}
- private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker]): TopicMetadata = {
+
+
+ private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): TopicMetadata = {
if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
@@ -298,22 +301,22 @@ object AdminUtils extends Logging {
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
- var leaderInfo: Option[Broker] = None
- var replicaInfo: Seq[Broker] = Nil
- var isrInfo: Seq[Broker] = Nil
+ var leaderInfo: Option[BrokerEndpoint] = None
+ var replicaInfo: Seq[BrokerEndpoint] = Nil
+ var isrInfo: Seq[BrokerEndpoint] = Nil
try {
leaderInfo = leader match {
case Some(l) =>
try {
- Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
+ Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head.getBrokerEndPoint(protocol))
} catch {
case e: Throwable => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e)
}
case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
}
try {
- replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
- isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
+ replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)).map(_.getBrokerEndPoint(protocol))
+ isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas).map(_.getBrokerEndPoint(protocol))
} catch {
case e: Throwable => throw new ReplicaNotAvailableException(e)
}
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 285c033..17e50bf 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -196,9 +196,7 @@ object TopicCommand {
}
}
}
-
- def formatBroker(broker: Broker) = broker.id + " (" + formatAddress(broker.host, broker.port) + ")"
-
+
def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = {
val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*"""))
require(configsToBeAdded.forall(config => config.length == 2),
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
index 24aaf95..a3587e4 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
@@ -18,18 +18,18 @@
package kafka.api
import java.nio.ByteBuffer
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
import kafka.common.ErrorMapping
object ConsumerMetadataResponse {
val CurrentVersion = 0
- private val NoBrokerOpt = Some(Broker(id = -1, host = "", port = -1))
+ private val NoBrokerOpt = Some(BrokerEndpoint(id = -1, host = "", port = -1))
def readFrom(buffer: ByteBuffer) = {
val correlationId = buffer.getInt
val errorCode = buffer.getShort
- val broker = Broker.readFrom(buffer)
+ val broker = BrokerEndpoint.readFrom(buffer)
val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
Some(broker)
else
@@ -40,7 +40,7 @@ object ConsumerMetadataResponse {
}
-case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, correlationId: Int)
+case class ConsumerMetadataResponse (coordinatorOpt: Option[BrokerEndpoint], errorCode: Short, correlationId: Int)
extends RequestOrResponse() {
def sizeInBytes =
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 4ff7e8f..bf93632 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -21,7 +21,7 @@ package kafka.api
import java.nio._
import kafka.utils._
import kafka.api.ApiUtils._
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.network.{BoundedByteBufferSend, RequestChannel}
import kafka.common.ErrorMapping
@@ -120,9 +120,9 @@ object LeaderAndIsrRequest {
}
val leadersCount = buffer.getInt
- var leaders = Set[Broker]()
+ var leaders = Set[BrokerEndpoint]()
for (i <- 0 until leadersCount)
- leaders += Broker.readFrom(buffer)
+ leaders += BrokerEndpoint.readFrom(buffer)
new LeaderAndIsrRequest(versionId, correlationId, clientId, controllerId, controllerEpoch, partitionStateInfos.toMap, leaders)
}
@@ -134,10 +134,10 @@ case class LeaderAndIsrRequest (versionId: Short,
controllerId: Int,
controllerEpoch: Int,
partitionStateInfos: Map[(String, Int), PartitionStateInfo],
- leaders: Set[Broker])
+ leaders: Set[BrokerEndpoint])
extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
- def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[Broker], controllerId: Int,
+ def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[BrokerEndpoint], controllerId: Int,
controllerEpoch: Int, correlationId: Int, clientId: String) = {
this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId,
controllerId, controllerEpoch, partitionStateInfos, leaders)
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index 0190076..76f8bc9 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -17,18 +17,17 @@
package kafka.api
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
import java.nio.ByteBuffer
import kafka.api.ApiUtils._
import kafka.utils.Logging
import kafka.common._
-import org.apache.kafka.common.utils.Utils._
object TopicMetadata {
val NoLeaderNodeId = -1
- def readFrom(buffer: ByteBuffer, brokers: Map[Int, Broker]): TopicMetadata = {
+ def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndpoint]): TopicMetadata = {
val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
val topic = readShortString(buffer)
val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
@@ -89,7 +88,7 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat
object PartitionMetadata {
- def readFrom(buffer: ByteBuffer, brokers: Map[Int, Broker]): PartitionMetadata = {
+ def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndpoint]): PartitionMetadata = {
val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */
val leaderId = buffer.getInt
@@ -110,10 +109,11 @@ object PartitionMetadata {
}
case class PartitionMetadata(partitionId: Int,
- val leader: Option[Broker],
- replicas: Seq[Broker],
- isr: Seq[Broker] = Seq.empty,
+ val leader: Option[BrokerEndpoint],
+ replicas: Seq[BrokerEndpoint],
+ isr: Seq[BrokerEndpoint] = Seq.empty,
errorCode: Short = ErrorMapping.NoError) extends Logging {
+
def sizeInBytes: Int = {
2 /* error code */ +
4 /* partition id */ +
@@ -142,14 +142,13 @@ case class PartitionMetadata(partitionId: Int,
override def toString(): String = {
val partitionMetadataString = new StringBuilder
partitionMetadataString.append("\tpartition " + partitionId)
- partitionMetadataString.append("\tleader: " + (if(leader.isDefined) formatBroker(leader.get) else "none"))
- partitionMetadataString.append("\treplicas: " + replicas.map(formatBroker).mkString(","))
- partitionMetadataString.append("\tisr: " + isr.map(formatBroker).mkString(","))
+ partitionMetadataString.append("\tleader: " + (if(leader.isDefined) leader.get.toString else "none"))
+ partitionMetadataString.append("\treplicas: " + replicas.mkString(","))
+ partitionMetadataString.append("\tisr: " + isr.mkString(","))
partitionMetadataString.append("\tisUnderReplicated: %s".format(if(isr.size < replicas.size) "true" else "false"))
partitionMetadataString.toString()
}
- private def formatBroker(broker: Broker) = broker.id + " (" + formatAddress(broker.host, broker.port) + ")"
}
diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
index 92ac4e6..4de566b 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
@@ -17,7 +17,7 @@
package kafka.api
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
import java.nio.ByteBuffer
object TopicMetadataResponse {
@@ -25,7 +25,7 @@ object TopicMetadataResponse {
def readFrom(buffer: ByteBuffer): TopicMetadataResponse = {
val correlationId = buffer.getInt
val brokerCount = buffer.getInt
- val brokers = (0 until brokerCount).map(_ => Broker.readFrom(buffer))
+ val brokers = (0 until brokerCount).map(_ => BrokerEndpoint.readFrom(buffer))
val brokerMap = brokers.map(b => (b.id, b)).toMap
val topicCount = buffer.getInt
val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer, brokerMap))
@@ -33,14 +33,16 @@ object TopicMetadataResponse {
}
}
-case class TopicMetadataResponse(brokers: Seq[Broker],
+case class TopicMetadataResponse(brokers: Seq[BrokerEndpoint],
topicsMetadata: Seq[TopicMetadata],
correlationId: Int)
extends RequestOrResponse() {
+
val sizeInBytes: Int = {
4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum
}
+
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
/* brokers */
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
index 530982e..cef3ec9 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -17,15 +17,15 @@
package kafka.api
import java.nio.ByteBuffer
+import kafka.cluster.{SecurityProtocol, BrokerEndpoint, Broker}
import kafka.api.ApiUtils._
-import kafka.cluster.Broker
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.{KafkaException, ErrorMapping, TopicAndPartition}
import kafka.network.{BoundedByteBufferSend, RequestChannel}
import kafka.network.RequestChannel.Response
import collection.Set
object UpdateMetadataRequest {
- val CurrentVersion = 0.shortValue
+ val CurrentVersion = 1.shortValue
val IsInit: Boolean = true
val NotInit: Boolean = false
val DefaultAckTimeout: Int = 1000
@@ -48,7 +48,14 @@ object UpdateMetadataRequest {
}
val numAliveBrokers = buffer.getInt
- val aliveBrokers = for(i <- 0 until numAliveBrokers) yield Broker.readFrom(buffer)
+
+ val aliveBrokers = versionId match {
+ case 0 => for(i <- 0 until numAliveBrokers) yield new Broker(BrokerEndpoint.readFrom(buffer),SecurityProtocol.PLAINTEXT)
+ case 1 => for(i <- 0 until numAliveBrokers) yield Broker.readFrom(buffer)
+ case v => throw new KafkaException( "Version " + v.toString + " is invalid for UpdateMetadataRequest. Valid versions are 0 or 1.")
+ }
+
+
new UpdateMetadataRequest(versionId, correlationId, clientId, controllerId, controllerEpoch,
partitionStateInfos.toMap, aliveBrokers.toSet)
}
@@ -82,7 +89,14 @@ case class UpdateMetadataRequest (versionId: Short,
value.writeTo(buffer)
}
buffer.putInt(aliveBrokers.size)
- aliveBrokers.foreach(_.writeTo(buffer))
+
+ versionId match {
+ case 0 => aliveBrokers.foreach(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).writeTo(buffer))
+ case 1 => aliveBrokers.foreach(_.writeTo(buffer))
+ case v => throw new KafkaException( "Version " + v.toString + " is invalid for UpdateMetadataRequest. Valid versions are 0 or 1.")
+ }
+
+
}
def sizeInBytes(): Int = {
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index ebba87f..614d4b8 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -18,17 +18,17 @@
import scala.collection._
import kafka.cluster._
+import kafka.cluster.SecurityProtocol.SecurityProtocol
import kafka.api._
import kafka.producer._
import kafka.common.{ErrorMapping, KafkaException}
import kafka.utils.{Utils, Logging}
import java.util.Properties
import util.Random
- import kafka.network.BlockingChannel
- import kafka.utils.ZkUtils._
- import org.I0Itec.zkclient.ZkClient
- import java.io.IOException
-import org.apache.kafka.common.utils.Utils.{getHost, getPort}
+import kafka.network.BlockingChannel
+import kafka.utils.ZkUtils._
+import org.I0Itec.zkclient.ZkClient
+import java.io.IOException
/**
* Helper functions common to clients (producer, consumer, or admin)
@@ -42,7 +42,7 @@ object ClientUtils extends Logging{
* @param producerConfig The producer's config
* @return topic metadata response
*/
- def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
+ def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndpoint], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
var fetchMetaDataSucceeded: Boolean = false
var i: Int = 0
val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
@@ -83,7 +83,7 @@ object ClientUtils extends Logging{
* @param clientId The client's identifier
* @return topic metadata response
*/
- def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int,
+ def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndpoint], clientId: String, timeoutMs: Int,
correlationId: Int = 0): TopicMetadataResponse = {
val props = new Properties()
props.put("metadata.broker.list", brokers.map(_.connectionString).mkString(","))
@@ -96,22 +96,22 @@ object ClientUtils extends Logging{
/**
* Parse a list of broker urls in the form host1:port1, host2:port2, ...
*/
- def parseBrokerList(brokerListStr: String): Seq[Broker] = {
+ def parseBrokerList(brokerListStr: String): Seq[BrokerEndpoint] = {
val brokersStr = Utils.parseCsvList(brokerListStr)
brokersStr.zipWithIndex.map { case (address, brokerId) =>
- new Broker(brokerId, getHost(address), getPort(address))
+ BrokerEndpoint.createBrokerEndPoint(brokerId, address)
}
}
/**
* Creates a blocking channel to a random broker
*/
- def channelToAnyBroker(zkClient: ZkClient, socketTimeoutMs: Int = 3000) : BlockingChannel = {
+ def channelToAnyBroker(zkClient: ZkClient, protocolType: SecurityProtocol, socketTimeoutMs: Int = 3000) : BlockingChannel = {
var channel: BlockingChannel = null
var connected = false
while (!connected) {
- val allBrokers = getAllBrokersInCluster(zkClient)
+ val allBrokers = getAllBrokerEndPointsForChannel(zkClient, protocolType)
Random.shuffle(allBrokers).find { broker =>
trace("Connecting to broker %s:%d.".format(broker.host, broker.port))
try {
@@ -136,19 +136,19 @@ object ClientUtils extends Logging{
/**
* Creates a blocking channel to the offset manager of the given group
*/
- def channelToOffsetManager(group: String, zkClient: ZkClient, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = {
- var queryChannel = channelToAnyBroker(zkClient)
+ def channelToOffsetManager(group: String, zkClient: ZkClient, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000, protocolType: SecurityProtocol = SecurityProtocol.PLAINTEXT) = {
+ var queryChannel = channelToAnyBroker(zkClient, protocolType)
var offsetManagerChannelOpt: Option[BlockingChannel] = None
while (!offsetManagerChannelOpt.isDefined) {
- var coordinatorOpt: Option[Broker] = None
+ var coordinatorOpt: Option[BrokerEndpoint] = None
while (!coordinatorOpt.isDefined) {
try {
if (!queryChannel.isConnected)
- queryChannel = channelToAnyBroker(zkClient)
+ queryChannel = channelToAnyBroker(zkClient, protocolType)
debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group))
queryChannel.send(ConsumerMetadataRequest(group))
val response = queryChannel.receive()
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 0060add..4104114 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -18,17 +18,40 @@
package kafka.cluster
import kafka.utils.Utils._
-import kafka.utils.Json
-import kafka.api.ApiUtils._
+import kafka.utils.{Utils, Json}
import java.nio.ByteBuffer
-import kafka.common.{KafkaException, BrokerNotAvailableException}
-import org.apache.kafka.common.utils.Utils._
+import kafka.common.{BrokerEndPointNotAvailableException, KafkaException, BrokerNotAvailableException}
+import kafka.cluster.SecurityProtocol._
/**
* A Kafka broker
+ * A broker has an id, a host and a collection of end-points
+ * each end-point is (port,protocolType)
+ * currently the only protocol type is PlainText
+ * but we will add SSL and Kerberos in the future
*/
object Broker {
+ /**
+ * Create a broker object from id and JSON string
+ * @param id
+ * @param brokerInfoString
+ *
+ * Version 1 JSON schema for a broker is:
+ * {"version":1,
+ * "host":"localhost",
+ * "port":9092
+ * "jmx_port":9999,
+ * "timestamp":"2233345666" }
+ *
+ * The current JSON schema for a broker is:
+ * {"version":2,
+ * "host","localhost",
+ * "port",9092
+ * "jmx_port":9999,
+ * "timestamp":"2233345666",
+ * "endpoints": "{PLAINTEXT://host1:9092,SSL://host1:9093"}
+ */
def createBroker(id: Int, brokerInfoString: String): Broker = {
if(brokerInfoString == null)
throw new BrokerNotAvailableException("Broker id %s does not exist".format(id))
@@ -36,9 +59,18 @@ object Broker {
Json.parseFull(brokerInfoString) match {
case Some(m) =>
val brokerInfo = m.asInstanceOf[Map[String, Any]]
- val host = brokerInfo.get("host").get.asInstanceOf[String]
- val port = brokerInfo.get("port").get.asInstanceOf[Int]
- new Broker(id, host, port)
+ val version = brokerInfo.get("version").get.asInstanceOf[Int]
+ val endpoints = version match {
+ case 1 =>
+ val host = brokerInfo.get("host").get.asInstanceOf[String]
+ val port = brokerInfo.get("port").get.asInstanceOf[Int]
+ Map(SecurityProtocol.PLAINTEXT -> new EndPoint(host, port, SecurityProtocol.PLAINTEXT))
+ case 2 =>
+ val listeners = brokerInfo.get("endpoints").get.asInstanceOf[String]
+ Utils.listenerListToEndPoints(listeners)
+ case _ => throw new KafkaException("Unknown version of broker registration. Only versions 1 and 2 are supported." + brokerInfoString)
+ }
+ new Broker(id, endpoints)
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
}
@@ -47,36 +79,79 @@ object Broker {
}
}
+ /**
+ *
+ * @param buffer containing serialized broker
+ * current serialization is:
+ * id (int), host (size + string), number of endpoints (int), serialized endpoints
+ * @return broker object
+ */
def readFrom(buffer: ByteBuffer): Broker = {
val id = buffer.getInt
- val host = readShortString(buffer)
- val port = buffer.getInt
- new Broker(id, host, port)
+ val numEndpoints = buffer.getInt
+
+ val endpoints = List.range(0, numEndpoints).map(i => EndPoint.readFrom(buffer))
+ .map(ep => ep.protocolType -> ep).toMap
+ new Broker(id, endpoints)
}
}
-case class Broker(id: Int, host: String, port: Int) {
-
- override def toString: String = "id:" + id + ",host:" + host + ",port:" + port
+case class Broker(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) {
+
+ override def toString: String = id + " : " + endPoints.values.mkString("(",",",")")
+
+ def this(id: Int, host: String, port: Int, protocol: SecurityProtocol) = {
+ this(id, Map(protocol -> EndPoint(host, port, protocol)))
+ }
+
+ def this(bep: BrokerEndpoint, protocol: SecurityProtocol) = {
+ this(bep.id, bep.host, bep.port, protocol)
+ }
- def connectionString: String = formatAddress(host, port)
def writeTo(buffer: ByteBuffer) {
buffer.putInt(id)
- writeShortString(buffer, host)
- buffer.putInt(port)
+ buffer.putInt(endPoints.size)
+ for(endpoint <- endPoints.values) {
+ endpoint.writeTo(buffer)
+ }
+ }
+
+ def sizeInBytes: Int =
+ 4 + /* broker id*/
+ 4 + /* number of endPoints */
+ endPoints.values.map(_.sizeInBytes).sum /* end points */
+
+ def supportsChannel(protocolType: SecurityProtocol): Unit = {
+ endPoints.contains(protocolType)
}
- def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/
+ def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndpoint = {
+ val endpoint = endPoints.get(protocolType)
+ endpoint match {
+ case Some(endpoint) => new BrokerEndpoint(id, endpoint.host, endpoint.port)
+ case None =>
+ throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id))
+ }
+
+
+ }
override def equals(obj: Any): Boolean = {
obj match {
case null => false
- case n: Broker => id == n.id && host == n.host && port == n.port
+ // Yes, Scala compares lists element by element
+ case n: Broker => id == n.id && endPoints == n.endPoints
case _ => false
}
}
-
- override def hashCode(): Int = hashcode(id, host, port)
+
+
+ override def hashCode(): Int = hashcode(id, endPoints)
}
+
+
+
+
+
diff --git a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
new file mode 100644
index 0000000..7b9198b
--- /dev/null
+++ b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.cluster
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
+import kafka.common.KafkaException
+import org.apache.kafka.common.utils.Utils._
+
+object BrokerEndpoint {
+ def createBrokerEndPoint(brokerId: Int, connectionString: String): BrokerEndpoint = {
+ val uriParseExp = """.*?\[?([0-9a-z\-.:]*)\]?:([0-9]+)""".r
+
+ connectionString match {
+ case uriParseExp(host, port) => new BrokerEndpoint(brokerId, host, port.toInt)
+ case _ => throw new KafkaException("Unable to parse " + connectionString + " to a broker endpoint")
+ }
+ }
+
+ /**
+ * BrokerEndpoint is used to connect to specific host:port pair
+ * It is typically used by clients (or brokers when connecting to other brokers)
+ * and contains no information about the security protocol used on the connection
+ * clients should know which security protocol to use from configuration
+ * this allows us to keep the wire protocol with the clients unchanged where the protocol is not needed
+ * @param buffer
+ * @return
+ */
+ def readFrom(buffer: ByteBuffer): BrokerEndpoint = {
+ val brokerId = buffer.getInt()
+ val host = readShortString(buffer)
+ val port = buffer.getInt()
+ BrokerEndpoint(brokerId, host, port)
+ }
+}
+
+// Utility class, representing a particular method of connecting to a broker
+// Mostly to be used by clients
+// This is not a broker and is not stored in ZooKeeper
+case class BrokerEndpoint(id: Int, host: String, port: Int) {
+
+ def connectionString(): String = formatAddress(host, port)
+
+ def writeTo(buffer: ByteBuffer): Unit = {
+ buffer.putInt(id)
+ writeShortString(buffer, host)
+ buffer.putInt(port)
+ }
+
+ def sizeInBytes: Int =
+ 4 + /* broker Id */
+ 4 + /* port */
+ shortStringLength(host)
+}
diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala
new file mode 100644
index 0000000..012d1fd
--- /dev/null
+++ b/core/src/main/scala/kafka/cluster/EndPoint.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.cluster
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
+import kafka.common.KafkaException
+import kafka.cluster.SecurityProtocol._
+
+object EndPoint {
+
+ def readFrom(buffer: ByteBuffer): EndPoint = {
+ val port = buffer.getInt()
+ val host = readShortString(buffer)
+ val protocol = buffer.getShort()
+ EndPoint(host, port, SecurityProtocol(protocol))
+ }
+
+ def createEndPoint(connectionString: String): EndPoint = {
+ val uriParseExp = """^(.*)://\[?([0-9a-z\-.:]*)\]?:([0-9]+)""".r
+ connectionString match {
+ case uriParseExp(protocol, "", port) => new EndPoint(null, port.toInt, SecurityProtocol.withName(protocol))
+ case uriParseExp(protocol, host, port) => new EndPoint(host, port.toInt, SecurityProtocol.withName(protocol))
+ case _ => throw new KafkaException("Unable to parse " + connectionString + " to a broker endpoint")
+ }
+ }
+}
+
+/**
+ * Part of the broker definition - matching host/port pair to a protocol
+ */
+case class EndPoint(host: String, port: Int, protocolType: SecurityProtocol) {
+
+ override def toString: String = {
+ val hostStr = if (host == null || host.contains(":")) "[" + host + "]" else host
+ protocolType + "://" + hostStr + ":" + port
+ }
+
+ def writeTo(buffer: ByteBuffer): Unit = {
+ buffer.putInt(port)
+ writeShortString(buffer, host)
+ buffer.putShort(protocolType.id.toShort)
+ }
+
+ def sizeInBytes: Int =
+ 4 + /* port */
+ shortStringLength(host) +
+ 2 /* protocol id */
+}
diff --git a/core/src/main/scala/kafka/cluster/SecurityProtocol.scala b/core/src/main/scala/kafka/cluster/SecurityProtocol.scala
new file mode 100644
index 0000000..ca7105c
--- /dev/null
+++ b/core/src/main/scala/kafka/cluster/SecurityProtocol.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.cluster
+
+
+object SecurityProtocol extends Enumeration {
+
+ type SecurityProtocol = Value
+ val TRACE = Value
+ val PLAINTEXT = Value
+}
+
diff --git a/core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala b/core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala
new file mode 100644
index 0000000..455d8c6
--- /dev/null
+++ b/core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class BrokerEndPointNotAvailableException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 9ebbee6..7ebb240 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -19,6 +19,7 @@ package kafka.consumer
import java.util.Properties
import kafka.api.OffsetRequest
+import kafka.cluster.SecurityProtocol
import kafka.utils._
import kafka.common.{InvalidConfigException, Config}
@@ -180,6 +181,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
/** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */
val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy)
+
+ /* plaintext or SSL */
+ val securityProtocol = SecurityProtocol.withName(props.getString("security.protocol", "PLAINTEXT"))
validate(this)
}
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index b9e2bea..59e9876 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -19,9 +19,8 @@ package kafka.consumer
import org.I0Itec.zkclient.ZkClient
import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager}
-import kafka.cluster.{Cluster, Broker}
+import kafka.cluster.{BrokerEndpoint, Cluster}
import scala.collection.immutable
-import scala.collection.Map
import collection.mutable.HashMap
import scala.collection.mutable
import java.util.concurrent.locks.ReentrantLock
@@ -53,7 +52,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
private class LeaderFinderThread(name: String) extends ShutdownableThread(name) {
// thread responsible for adding the fetcher to the right broker when leader is available
override def doWork() {
- val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
+ val leaderForPartitionsMap = new HashMap[TopicAndPartition, BrokerEndpoint]
lock.lock()
try {
while (noLeaderPartitionSet.isEmpty) {
@@ -62,7 +61,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
}
trace("Partitions without leader %s".format(noLeaderPartitionSet))
- val brokers = getAllBrokersInCluster(zkClient)
+ val brokers = getAllBrokerEndPointsForChannel(zkClient, config.securityProtocol)
val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
brokers,
config.clientId,
@@ -114,7 +113,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
}
}
- override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
+ override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndpoint): AbstractFetcherThread = {
new ConsumerFetcherThread(
"ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
config, sourceBroker, partitionMap, this)
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index ee6139c..a51a678 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -17,7 +17,7 @@
package kafka.consumer
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
import kafka.server.AbstractFetcherThread
import kafka.message.ByteBufferMessageSet
import kafka.api.{Request, OffsetRequest, FetchResponsePartitionData}
@@ -26,7 +26,7 @@ import kafka.common.TopicAndPartition
class ConsumerFetcherThread(name: String,
val config: ConsumerConfig,
- sourceBroker: Broker,
+ sourceBroker: BrokerEndpoint,
partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
val consumerFetcherManager: ConsumerFetcherManager)
extends AbstractFetcherThread(name = name,
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 5487259..531b25d 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -185,7 +185,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def ensureOffsetManagerConnected() {
if (config.offsetsStorage == "kafka") {
if (offsetsChannel == null || !offsetsChannel.isConnected)
- offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkClient, config.offsetsChannelSocketTimeoutMs, config.offsetsChannelBackoffMs)
+ offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkClient,
+ config.offsetsChannelSocketTimeoutMs, config.offsetsChannelBackoffMs, config.securityProtocol)
debug("Connected to offset manager %s:%d.".format(offsetsChannel.host, offsetsChannel.port))
}
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 14b22ab..5864967 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -18,6 +18,7 @@ package kafka.controller
import kafka.network.{Receive, BlockingChannel}
import kafka.utils.{Utils, Logging, ShutdownableThread}
+import org.apache.kafka.common.protocol.ApiVersion
import collection.mutable.HashMap
import kafka.cluster.Broker
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
@@ -80,7 +81,8 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
private def addNewBroker(broker: Broker) {
val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize)
debug("Controller %d trying to connect to broker %d".format(config.brokerId,broker.id))
- val channel = new BlockingChannel(broker.host, broker.port,
+ val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
+ val channel = new BlockingChannel(brokerEndPoint.host, brokerEndPoint.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
config.controllerSocketTimeoutMs)
@@ -278,7 +280,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
val broker = m._1
val partitionStateInfos = m._2.toMap
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
- val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id))
+ val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map(b => b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol))
val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId)
for (p <- partitionStateInfos) {
val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
@@ -293,8 +295,10 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
updateMetadataRequestMap.foreach { m =>
val broker = m._1
val partitionStateInfos = m._2.toMap
- val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, correlationId, clientId,
- partitionStateInfos, controllerContext.liveOrShuttingDownBrokers)
+
+ val versionId = if (controller.config.intraBrokerProtocolVersion.onOrAfter(ApiVersion.KAFKA_0830)) 1 else 0
+ val updateMetadataRequest = new UpdateMetadataRequest(versionId = versionId.toShort, controllerId = controllerId, controllerEpoch = controllerEpoch,
+ correlationId = correlationId, clientId = clientId, partitionStateInfos = partitionStateInfos, aliveBrokers = controllerContext.liveOrShuttingDownBrokers)
partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " +
"correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
correlationId, broker, p._1)))
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 66df6d2..6cf4c84 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -212,7 +212,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
def epoch = controllerContext.epoch
- def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, config.hostName, config.port)
+ def clientId = {
+ val listeners = listenerListToEndPoints(config.listeners)
+ val controllerListener = listeners.get(config.interBrokerSecurityProtocol)
+ "id_%d-host_%s-port_%d".format(config.brokerId, controllerListener.get.host, controllerListener.get.port)
+ }
/**
* On clean shutdown, the controller first determines the partitions that the
diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
index d281bb3..9c14428 100644
--- a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
@@ -18,14 +18,13 @@
package kafka.javaapi
import java.nio.ByteBuffer
-
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadataResponse) {
def errorCode = underlying.errorCode
- def coordinator: Broker = {
+ def coordinator: BrokerEndpoint = {
import kafka.javaapi.Implicits._
underlying.coordinatorOpt
}
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
index f384e04..ebbd589 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
@@ -16,7 +16,7 @@
*/
package kafka.javaapi
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
import scala.collection.JavaConversions
private[javaapi] object MetadataListImplicits {
@@ -52,17 +52,17 @@ class TopicMetadata(private val underlying: kafka.api.TopicMetadata) {
class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) {
def partitionId: Int = underlying.partitionId
- def leader: Broker = {
+ def leader: BrokerEndpoint = {
import kafka.javaapi.Implicits._
underlying.leader
}
- def replicas: java.util.List[Broker] = {
+ def replicas: java.util.List[BrokerEndpoint] = {
import JavaConversions._
underlying.replicas
}
- def isr: java.util.List[Broker] = {
+ def isr: java.util.List[BrokerEndpoint] = {
import JavaConversions._
underlying.isr
}
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 7b1db3d..c4bad46 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -18,6 +18,8 @@
package kafka.network
import java.util.concurrent._
+import kafka.cluster.SecurityProtocol
+import kafka.cluster.SecurityProtocol.SecurityProtocol
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import java.nio.ByteBuffer
@@ -30,7 +32,7 @@ import org.apache.log4j.Logger
object RequestChannel extends Logging {
- val AllDone = new Request(1, 2, getShutdownReceive(), 0)
+ val AllDone = new Request(processor = 1, requestKey = 2, buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT)
def getShutdownReceive() = {
val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
@@ -41,7 +43,7 @@ object RequestChannel extends Logging {
byteBuffer
}
- case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) {
+ case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0), securityProtocol: SecurityProtocol) {
@volatile var requestDequeueTimeMs = -1L
@volatile var apiLocalCompleteTimeMs = -1L
@volatile var responseCompleteTimeMs = -1L
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 39b1651..ff0c1ab 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -24,7 +24,11 @@ import java.net._
import java.io._
import java.nio.channels._
+import kafka.cluster.{SecurityProtocol, EndPoint}
+import kafka.cluster.SecurityProtocol.SecurityProtocol
+
import scala.collection._
+import scala.collection.JavaConversions._
import kafka.common.KafkaException
import kafka.metrics.KafkaMetricsGroup
@@ -38,8 +42,7 @@ import com.yammer.metrics.core.{Gauge, Meter}
* M Handler threads that handle requests and produce responses back to the processor threads for writing.
*/
class SocketServer(val brokerId: Int,
- val host: String,
- val port: Int,
+ val endpoints: Map[SecurityProtocol, EndPoint],
val numProcessorThreads: Int,
val maxQueuedRequests: Int,
val sendBufferSize: Int,
@@ -51,28 +54,39 @@ class SocketServer(val brokerId: Int,
this.logIdent = "[Socket Server on Broker " + brokerId + "], "
private val time = SystemTime
private val processors = new Array[Processor](numProcessorThreads)
- @volatile private var acceptor: Acceptor = null
+ @volatile private var acceptors: ConcurrentHashMap[EndPoint,Acceptor] = new ConcurrentHashMap[EndPoint,Acceptor]()
val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
/* a meter to track the average free capacity of the network processors */
private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
+
+ /* I'm pushing the mapping of port-to-protocol to the processor level,
+ so the processor can put the correct protocol in the request channel.
+ we'll probably have a more elegant way of doing this once we patch the request channel
+ to include more information about security and authentication.
+ TODO: re-consider this code when working on KAFKA-1683
+ */
+ private val portToProtocol: Map[Int, SecurityProtocol] =
+ endpoints.map{ case (protocol: SecurityProtocol, endpoint: EndPoint) => (endpoint.port -> protocol )}
+
/**
* Start the socket server
*/
def startup() {
val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
for(i <- 0 until numProcessorThreads) {
- processors(i) = new Processor(i,
- time,
- maxRequestSize,
+ processors(i) = new Processor(i,
+ time,
+ maxRequestSize,
aggregateIdleMeter,
newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
- numProcessorThreads,
+ numProcessorThreads,
requestChannel,
quotas,
- connectionsMaxIdleMs)
- Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
+ connectionsMaxIdleMs,
+ portToProtocol)
+ Utils.newThread("kafka-network-thread-%d".format(i), processors(i), false).start()
}
newGauge("ResponsesBeingSent", new Gauge[Int] {
@@ -83,10 +97,17 @@ class SocketServer(val brokerId: Int,
requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
// start accepting connections
- this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
- Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
- acceptor.awaitStartup
- info("Started")
+ // right now we will use the same processors for all ports, since we didn't implement different protocols
+ // in the future, we may implement different processors for SSL and Kerberos
+
+ endpoints.values.foreach(endpoint => {
+ val acceptor = new Acceptor(endpoint.host, endpoint.port, processors, sendBufferSize, recvBufferSize, quotas)
+ acceptors.put(endpoint,acceptor)
+ Utils.newThread("kafka-socket-acceptor-%s-%d".format(endpoint.protocolType.toString, endpoint.port), acceptor, false).start()
+ acceptor.awaitStartup
+ })
+
+ info("Started " + acceptors.size() + " acceptor threads")
}
/**
@@ -94,8 +115,8 @@ class SocketServer(val brokerId: Int,
*/
def shutdown() = {
info("Shutting down")
- if(acceptor != null)
- acceptor.shutdown()
+ if(acceptors != null)
+ acceptors.values().foreach(_.shutdown())
for(processor <- processors)
processor.shutdown()
info("Shutdown completed")
@@ -301,7 +322,8 @@ private[kafka] class Processor(val id: Int,
val totalProcessorThreads: Int,
val requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas,
- val connectionsMaxIdleMs: Long) extends AbstractServerThread(connectionQuotas) {
+ val connectionsMaxIdleMs: Long,
+ val portToProtocol: Map[Int,SecurityProtocol]) extends AbstractServerThread(connectionQuotas) {
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
private val connectionsMaxIdleNanos = connectionsMaxIdleMs * 1000 * 1000
@@ -447,7 +469,9 @@ private[kafka] class Processor(val id: Int,
if(read < 0) {
close(key)
} else if(receive.complete) {
- val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
+ val port = socketChannel.socket().getLocalPort
+ val protocol = portToProtocol(port)
+ val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address, securityProtocol = protocol)
requestChannel.sendRequest(req)
key.attach(null)
// explicitly reset interest ops to not READ, no need to wake up the selector just yet
diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala
index 43df70b..362623f 100644
--- a/core/src/main/scala/kafka/producer/ProducerPool.scala
+++ b/core/src/main/scala/kafka/producer/ProducerPool.scala
@@ -17,7 +17,7 @@
package kafka.producer
-import kafka.cluster.Broker
+import kafka.cluster.{BrokerEndpoint, Broker}
import java.util.Properties
import collection.mutable.HashMap
import java.lang.Object
@@ -30,7 +30,7 @@ object ProducerPool {
/**
* Used in ProducerPool to initiate a SyncProducer connection with a broker.
*/
- def createSyncProducer(config: ProducerConfig, broker: Broker): SyncProducer = {
+ def createSyncProducer(config: ProducerConfig, broker: BrokerEndpoint): SyncProducer = {
val props = new Properties()
props.put("host", broker.host)
props.put("port", broker.port.toString)
@@ -44,11 +44,12 @@ class ProducerPool(val config: ProducerConfig) extends Logging {
private val lock = new Object()
def updateProducer(topicMetadata: Seq[TopicMetadata]) {
- val newBrokers = new collection.mutable.HashSet[Broker]
+ val newBrokers = new collection.mutable.HashSet[BrokerEndpoint]
topicMetadata.foreach(tmd => {
tmd.partitionsMetadata.foreach(pmd => {
- if(pmd.leader.isDefined)
- newBrokers+=(pmd.leader.get)
+ if(pmd.leader.isDefined) {
+ newBrokers += pmd.leader.get
+ }
})
})
lock synchronized {
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 20c00cb..94aa952 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
import scala.collection.Set
import scala.collection.Map
import kafka.utils.{Utils, Logging}
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
import kafka.metrics.KafkaMetricsGroup
import kafka.common.TopicAndPartition
import com.yammer.metrics.core.Gauge
@@ -68,7 +68,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
}
// to be defined in subclass to create a specific fetcher
- def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread
+ def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndpoint): AbstractFetcherThread
def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) {
mapLock synchronized {
@@ -126,6 +126,6 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
}
}
-case class BrokerAndFetcherId(broker: Broker, fetcherId: Int)
+case class BrokerAndFetcherId(broker: BrokerEndpoint, fetcherId: Int)
-case class BrokerAndInitialOffset(broker: Broker, initOffset: Long)
\ No newline at end of file
+case class BrokerAndInitialOffset(broker: BrokerEndpoint, initOffset: Long)
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 8c281d4..210369a 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -17,7 +17,7 @@
package kafka.server
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
import kafka.utils.{Pool, ShutdownableThread}
import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
@@ -36,7 +36,7 @@ import com.yammer.metrics.core.Gauge
/**
* Abstract class for fetching data from multiple partitions from the same broker.
*/
-abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
+abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: BrokerEndpoint, socketTimeout: Int, socketBufferSize: Int,
fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1,
isInterruptible: Boolean = true)
extends ShutdownableThread(name, isInterruptible) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f2b027b..b8a7093 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -20,10 +20,9 @@ package kafka.server
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.JoinGroupResponse
import org.apache.kafka.common.requests.HeartbeatResponse
-import org.apache.kafka.common.requests.ResponseHeader
-import org.apache.kafka.common.protocol.types.Struct
import kafka.api._
+import kafka.cluster.SecurityProtocol.SecurityProtocol
import kafka.common._
import kafka.log._
import kafka.network._
@@ -32,9 +31,6 @@ import kafka.network.RequestChannel.Response
import kafka.controller.KafkaController
import kafka.utils.{SystemTime, Logging}
-import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic._
import scala.collection._
import org.I0Itec.zkclient.ZkClient
@@ -355,8 +351,8 @@ class KafkaApis(val requestChannel: RequestChannel,
ret.toSeq.sortBy(- _)
}
- private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = {
- val topicResponses = metadataCache.getTopicMetadata(topics)
+ private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol): Seq[TopicMetadata] = {
+ val topicResponses = metadataCache.getTopicMetadata(topics, securityProtocol)
if (topics.size > 0 && topicResponses.size != topics.size) {
val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
@@ -398,10 +394,10 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
- val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet)
+ val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet, request.securityProtocol)
val brokers = metadataCache.getAliveBrokers
trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
- val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId)
+ val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata, metadataRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
@@ -438,7 +434,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val partition = offsetManager.partitionFor(consumerMetadataRequest.group)
// get metadata (and create the topic if necessary)
- val offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName)).head
+ val offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName), request.securityProtocol).head
val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6d74983..bcc3596 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -18,22 +18,62 @@
package kafka.server
import java.util.Properties
+import kafka.cluster.{EndPoint, SecurityProtocol}
+import kafka.common.InvalidConfigException
import kafka.message.{MessageSet, Message}
import kafka.consumer.ConsumerConfig
import kafka.utils.{VerifiableProperties, ZKConfig, Utils}
import kafka.message.NoCompressionCodec
import kafka.message.BrokerCompressionCodec
+import org.apache.kafka.common.protocol.ApiVersion
/**
* Configuration settings for the kafka server
*/
class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
+ def validate(config: KafkaConfig) {
+ validateUniquePortAndProtocol(config.advertisedListeners)
+ }
+
+ def validateUniquePortAndProtocol(listeners: String) {
+ val listenerList = Utils.parseCsvList(listeners)
+ val endpoints = listenerList.map(listener => EndPoint.createEndPoint(listener))
+ val distinctPorts = endpoints.map(ep => ep.port).distinct
+ val distinctProtocols = endpoints.map(ep => ep.protocolType).distinct
+
+ if (distinctPorts.size < endpoints.size) throw new InvalidConfigException("Only one listener is allowed per port. Please check listeners for duplicates")
+ if (distinctProtocols.size < endpoints.size) throw new InvalidConfigException("Only one listener is allowed per protocol. Please check listeners for duplicates")
+ }
+
def this(originalProps: Properties) {
this(new VerifiableProperties(originalProps))
props.verify()
}
+ // If the user did not define listeners but did define host or port, lets use them in backward compatible way
+ // If none of those are defined, we default to PLAINTEXT://null:6667
+ private def getListeners(): String = {
+ if (props.containsKey("listeners")) {
+ props.getString("listeners")
+ } else {
+ "PLAINTEXT://" + props.getString("host.name", null) + ":" + props.getInt("port", 6667).toString
+ }
+ }
+
+ // If the user defined advertised listeners, we use those
+ // If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults
+ // If none of these are defined, we'll use the listeners
+ private def getAdvertisedListeners(): String = {
+ if (props.containsKey("advertised.listeners")) {
+ props.getString("advertised.listeners")
+ } else if (props.containsKey("advertised.host.name") || props.containsKey("advertised.port") ) {
+ "PLAINTEXT://" + props.getString("advertised.host.name", props.getString("host.name", null)) + ":" + props.getInt("advertised.port", props.getInt("port", 6667)).toString
+ } else {
+ getListeners()
+ }
+ }
+
private def getLogRetentionTimeMillis(): Long = {
val millisInMinute = 60L * 1000L
val millisInHour = 60L * millisInMinute
@@ -99,23 +139,20 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/*********** Socket Server Configuration ***********/
- /* the port to listen and accept connections on */
- val port: Int = props.getInt("port", 9092)
-
- /* hostname of broker. If this is set, it will only bind to this address. If this is not set,
- * it will bind to all interfaces */
- val hostName: String = props.getString("host.name", null)
-
- /* hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may
- * need to be different from the interface to which the broker binds. If this is not set,
- * it will use the value for "host.name" if configured. Otherwise
- * it will use the value returned from java.net.InetAddress.getCanonicalHostName(). */
- val advertisedHostName: String = props.getString("advertised.host.name", hostName)
-
- /* the port to publish to ZooKeeper for clients to use. In IaaS environments, this may
- * need to be different from the port to which the broker binds. If this is not set,
- * it will publish the same port that the broker binds to. */
- val advertisedPort: Int = props.getInt("advertised.port", port)
+ /* Listener List - Comma-separated list of URIs we will listen on and their protocols.
+ * Specify hostname as 0.0.0.0 to bind to all interfaces
+ * Leave hostname empty to bind to default interface
+ *
+ * examples of legal listener lists:
+ * PLAINTEXT://myhost:9092,TRACE://:9091
+ * PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093
+ * */
+ val listeners: String = getListeners()
+
+ /* Listeners to publish to ZooKeeper for clients to use, if different than the listeners above.
+ * In IaaS environments, this may need to be different from the interface to which the broker binds.
+ * If this is not set, it will use the value for "listeners" */
+ val advertisedListeners: String = getAdvertisedListeners()
/* the SO_SNDBUFF buffer of the socket sever sockets */
val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024)
@@ -135,6 +172,16 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* idle connections timeout: the server socket processor threads close the connections that idle more than this */
val connectionsMaxIdleMs = props.getLong("connections.max.idle.ms", 10*60*1000L)
+ /* security protocol used to communicate between brokers. Defaults to plain text. */
+ val interBrokerSecurityProtocol = SecurityProtocol.withName(props.getString("security.intra.broker.protocol","PLAINTEXT"))
+
+ /* specify which version of the inter-broker protocol will be used
+ this is typically bumped after all brokers were upgraded to a new version
+ valid values: 0.8.2.0, 0.8.3.0, 0.9.0.0
+ */
+
+ val intraBrokerProtocolVersion = ApiVersion.parseConfig(props.getString("use.intra.broker.protocol.version","0.8.3.0"))
+
/*********** Log Configuration ***********/
/* the default number of log partitions per topic */
@@ -359,4 +406,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val compressionType = props.getString("compression.type", "producer").toLowerCase()
require(BrokerCompressionCodec.isValid(compressionType), "compression.type : "+compressionType + " is not valid." +
" Valid options are "+BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
+
+ validate(this)
+
}
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 4acdd70..b6ef750 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -17,6 +17,8 @@
package kafka.server
+import kafka.cluster.{SecurityProtocol, EndPoint}
+import kafka.cluster.SecurityProtocol.SecurityProtocol
import kafka.utils._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
@@ -31,9 +33,8 @@ import java.net.InetAddress
* Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise
* we are dead.
*/
-class KafkaHealthcheck(private val brokerId: Int,
- private val advertisedHost: String,
- private val advertisedPort: Int,
+class KafkaHealthcheck(private val brokerId: Int,
+ private val advertisedEndpoints: Map[SecurityProtocol, EndPoint],
private val zkSessionTimeoutMs: Int,
private val zkClient: ZkClient) extends Logging {
@@ -54,13 +55,18 @@ class KafkaHealthcheck(private val brokerId: Int,
* Register this broker as "alive" in zookeeper
*/
def register() {
- val advertisedHostName =
- if(advertisedHost == null || advertisedHost.trim.isEmpty)
- InetAddress.getLocalHost.getCanonicalHostName
- else
- advertisedHost
val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
- ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, zkSessionTimeoutMs, jmxPort)
+ val updatedEndpoints = advertisedEndpoints.mapValues(endpoint =>
+ if (endpoint.host == null || endpoint.host.trim.isEmpty)
+ EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port, endpoint.protocolType)
+ else
+ endpoint
+ )
+
+ // the default host and port are here for compatibility with older client
+ // only PLAINTEXT is supported as default
+ val defaultEndpoint = updatedEndpoints(SecurityProtocol.PLAINTEXT)
+ ZkUtils.registerBrokerInZk(zkClient, brokerId, defaultEndpoint.host, defaultEndpoint.port, updatedEndpoints, zkSessionTimeoutMs, jmxPort)
}
/**
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 89200da..8a60b0f 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -25,6 +25,8 @@ import kafka.utils._
import java.util.concurrent._
import atomic.{AtomicInteger, AtomicBoolean}
import java.io.File
+import org.apache.kafka.common.protocol.ApiVersion
+
import collection.mutable
import org.I0Itec.zkclient.ZkClient
import kafka.controller.{ControllerStats, KafkaController}
@@ -94,8 +96,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
this.logIdent = "[Kafka Server " + config.brokerId + "], "
socketServer = new SocketServer(config.brokerId,
- config.hostName,
- config.port,
+ Utils.listenerListToEndPoints(config.listeners),
config.numNetworkThreads,
config.queuedMaxRequests,
config.socketSendBufferBytes,
@@ -128,7 +129,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
topicConfigManager.startup()
/* tell everyone we are alive */
- kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
+ val endpoints = Utils.listenerListToEndPoints(config.advertisedListeners)
+ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, endpoints, config.zkSessionTimeoutMs, zkClient)
kafkaHealthcheck.startup()
registerStats()
@@ -207,7 +209,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
if (channel != null) {
channel.disconnect()
}
- channel = new BlockingChannel(broker.host, broker.port,
+ channel = new BlockingChannel(broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).host,
+ broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
config.controllerSocketTimeoutMs)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index bf81a1a..aff86e3 100644
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,9 +17,11 @@
package kafka.server
+import kafka.cluster.SecurityProtocol.SecurityProtocol
+
import scala.collection.{Seq, Set, mutable}
import kafka.api._
-import kafka.cluster.Broker
+import kafka.cluster.{BrokerEndpoint, Broker}
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.utils.Utils._
import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException}
@@ -36,7 +38,7 @@ private[server] class MetadataCache {
private var aliveBrokers: Map[Int, Broker] = Map()
private val partitionMetadataLock = new ReentrantReadWriteLock()
- def getTopicMetadata(topics: Set[String]) = {
+ def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol) = {
val isAllTopics = topics.isEmpty
val topicsRequested = if(isAllTopics) cache.keySet else topics
val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata]
@@ -47,18 +49,21 @@ private[server] class MetadataCache {
val partitionMetadata = partitionStateInfos.map {
case (partitionId, partitionState) =>
val replicas = partitionState.allReplicas
- val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq
- var leaderInfo: Option[Broker] = None
- var isrInfo: Seq[Broker] = Nil
+ val replicaInfo: Seq[BrokerEndpoint] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq.map(_.getBrokerEndPoint(protocol))
+ var leaderInfo: Option[BrokerEndpoint] = None
+ var leaderBrokerInfo: Option[Broker] = None
+ var isrInfo: Seq[BrokerEndpoint] = Nil
val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
val leader = leaderIsrAndEpoch.leaderAndIsr.leader
val isr = leaderIsrAndEpoch.leaderAndIsr.isr
val topicPartition = TopicAndPartition(topic, partitionId)
try {
- leaderInfo = aliveBrokers.get(leader)
- if (!leaderInfo.isDefined)
+ leaderBrokerInfo = aliveBrokers.get(leader)
+ if (!leaderBrokerInfo.isDefined)
throw new LeaderNotAvailableException("Leader not available for %s".format(topicPartition))
- isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null)
+ else
+ leaderInfo = Some(leaderBrokerInfo.get.getBrokerEndPoint(protocol))
+ isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).map(_.getBrokerEndPoint(protocol))
if (replicaInfo.size < replicas.size)
throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 351dbba..f0a2a5b 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -17,13 +17,13 @@
package kafka.server
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId,
"Replica", brokerConfig.numReplicaFetchers) {
- override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
+ override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndpoint): AbstractFetcherThread = {
new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr)
}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 6879e73..b155cd1 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -18,14 +18,14 @@
package kafka.server
import kafka.admin.AdminUtils
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
import kafka.log.LogConfig
import kafka.message.ByteBufferMessageSet
import kafka.api.{OffsetRequest, FetchResponsePartitionData}
import kafka.common.{KafkaStorageException, TopicAndPartition}
class ReplicaFetcherThread(name:String,
- sourceBroker: Broker,
+ sourceBroker: BrokerEndpoint,
brokerConfig: KafkaConfig,
replicaMgr: ReplicaManager)
extends AbstractFetcherThread(name = name,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index fb948b9..4685de9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -19,7 +19,7 @@ package kafka.server
import kafka.api._
import kafka.common._
import kafka.utils._
-import kafka.cluster.{Broker, Partition, Replica}
+import kafka.cluster.{BrokerEndpoint, Partition, Replica}
import kafka.log.{LogAppendInfo, LogManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
@@ -610,7 +610,7 @@ class ReplicaManager(val config: KafkaConfig,
* the error message will be set on each partition since we do not know which partition caused it
*/
private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo],
- leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short],
+ leaders: Set[BrokerEndpoint], correlationId: Int, responseMap: mutable.Map[(String, Int), Short],
offsetManager: OffsetManager) {
partitionState.foreach { state =>
stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index d1e7c43..03b121d 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -19,6 +19,7 @@ package kafka.tools
import joptsimple._
+import kafka.cluster.SecurityProtocol
import org.I0Itec.zkclient.ZkClient
import kafka.utils._
import kafka.consumer.SimpleConsumer
@@ -158,7 +159,7 @@ object ConsumerOffsetChecker extends Logging {
topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*)
val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq
- val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)
+ val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs, SecurityProtocol.PLAINTEXT)
debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port))
channel.send(OffsetFetchRequest(group, topicPartitions))
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index ba6ddd7..d1050b4 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -18,7 +18,7 @@
package kafka.tools
import joptsimple.OptionParser
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet}
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
@@ -197,7 +197,7 @@ private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, c
private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int],
leadersPerBroker: Map[Int, Seq[TopicAndPartition]],
expectedNumFetchers: Int,
- brokerMap: Map[Int, Broker],
+ brokerMap: Map[Int, BrokerEndpoint],
initialOffsetTime: Long,
reportInterval: Long) extends Logging {
private val fetchOffsetMap = new Pool[TopicAndPartition, Long]
@@ -335,7 +335,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
}
}
-private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartitions: Iterable[TopicAndPartition],
+private class ReplicaFetcher(name: String, sourceBroker: BrokerEndpoint, topicAndPartitions: Iterable[TopicAndPartition],
replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int,
fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean)
extends ShutdownableThread(name) {
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index b4f903b..7379fe3 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -22,7 +22,7 @@ import kafka.utils._
import kafka.consumer._
import kafka.client.ClientUtils
import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
import scala.collection.JavaConversions._
import kafka.common.TopicAndPartition
@@ -142,8 +142,8 @@ object SimpleConsumerShell extends Logging {
}
// validating replica id and initializing target broker
- var fetchTargetBroker: Broker = null
- var replicaOpt: Option[Broker] = null
+ var fetchTargetBroker: BrokerEndpoint = null
+ var replicaOpt: Option[BrokerEndpoint] = null
if(replicaId == UseLeaderReplica) {
replicaOpt = partitionMetadataOpt.get.leader
if(!replicaOpt.isDefined) {
@@ -167,7 +167,9 @@ object SimpleConsumerShell extends Logging {
System.exit(1)
}
if (startingOffset < 0) {
- val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout,
+ val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host,
+ fetchTargetBroker.port,
+ ConsumerConfig.SocketTimeout,
ConsumerConfig.SocketBufferSize, clientId)
try {
startingOffset = simpleConsumer.earliestOrLatestOffset(TopicAndPartition(topic, partitionId), startingOffset,
@@ -188,8 +190,12 @@ object SimpleConsumerShell extends Logging {
val replicaString = if(replicaId > 0) "leader" else "replica"
info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]"
- .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
- val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId)
+ .format(topic, partitionId, replicaString, replicaId,
+ fetchTargetBroker.host,
+ fetchTargetBroker.port, startingOffset))
+ val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host,
+ fetchTargetBroker.port,
+ 10000, 64*1024, clientId)
val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
def run() {
var offset = startingOffset
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index 111c9a8..c6c5a88 100644
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -17,6 +17,7 @@
package kafka.tools
+import kafka.cluster.SecurityProtocol
import org.I0Itec.zkclient.ZkClient
import kafka.consumer.{SimpleConsumer, ConsumerConfig}
import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
@@ -65,7 +66,9 @@ object UpdateOffsetsInZK {
ZkUtils.getBrokerInfo(zkClient, broker) match {
case Some(brokerInfo) =>
- val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024, "UpdateOffsetsInZk")
+ val consumer = new SimpleConsumer(brokerInfo.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host,
+ brokerInfo.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port,
+ 10000, 100 * 1024, "UpdateOffsetsInZk")
val topicAndPartition = TopicAndPartition(topic, partition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1)))
val offset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 738c1af..b2dec5c 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -24,11 +24,15 @@ import java.nio.channels._
import java.util.concurrent.locks.{ReadWriteLock, Lock}
import java.lang.management._
import javax.management._
+
+import kafka.cluster.SecurityProtocol.SecurityProtocol
+
import scala.collection._
import scala.collection.mutable
import java.util.Properties
import kafka.common.KafkaException
import kafka.common.KafkaStorageException
+import kafka.cluster.EndPoint
/**
@@ -607,4 +611,9 @@ object Utils extends Logging {
.filter{ case (k,l) => (l > 1) }
.keys
}
+
+ def listenerListToEndPoints(listeners: String): immutable.Map[SecurityProtocol, EndPoint] = {
+ val listenerList = parseCsvList(listeners)
+ listenerList.map(listener => EndPoint.createEndPoint(listener)).map(ep => ep.protocolType -> ep).toMap
+ }
}
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index c14bd45..2de1223 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -17,7 +17,8 @@
package kafka.utils
-import kafka.cluster.{Broker, Cluster}
+import kafka.cluster.SecurityProtocol.SecurityProtocol
+import kafka.cluster._
import kafka.consumer.{ConsumerThreadId, TopicCount}
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
@@ -83,6 +84,10 @@ object ZkUtils extends Logging {
brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
}
+ def getAllBrokerEndPointsForChannel(zkClient: ZkClient, protocolType: SecurityProtocol): Seq[BrokerEndpoint] = {
+ getAllBrokersInCluster(zkClient).map(_.getBrokerEndPoint(protocolType))
+ }
+
def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = {
ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr)
}
@@ -168,12 +173,28 @@ object ZkUtils extends Logging {
}
}
- def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) {
+ /**
+ * Register brokers with v2 json format (which includes multiple endpoints)
+ * This format also includes default endpoints for compatibility with older clients
+ * @param zkClient
+ * @param id
+ * @param advertisedEndpoints
+ * @param timeout
+ * @param jmxPort
+ */
+ def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, advertisedEndpoints: immutable.Map[SecurityProtocol, EndPoint], timeout: Int, jmxPort: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
val timestamp = SystemTime.milliseconds.toString
- val brokerInfo = Json.encode(Map("version" -> 1, "host" -> host, "port" -> port, "jmx_port" -> jmxPort, "timestamp" -> timestamp))
- val expectedBroker = new Broker(id, host, port)
+ val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port, "endpoints"->advertisedEndpoints.values.mkString(","), "jmx_port" -> jmxPort, "timestamp" -> timestamp))
+ val expectedBroker = new Broker(id, advertisedEndpoints)
+
+ registerBrokerInZk(zkClient, brokerIdPath, brokerInfo, expectedBroker, timeout)
+
+ info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(",")))
+ }
+
+ def registerBrokerInZk(zkClient: ZkClient, brokerIdPath: String, brokerInfo: String, expectedBroker: Broker, timeout: Int) {
try {
createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, expectedBroker,
(brokerString: String, broker: Any) => Broker.createBroker(broker.asInstanceOf[Broker].id, brokerString).equals(broker.asInstanceOf[Broker]),
@@ -182,11 +203,10 @@ object ZkUtils extends Logging {
} catch {
case e: ZkNodeExistsException =>
throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
- + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or "
- + "else you have shutdown this broker and restarted it faster than the zookeeper "
- + "timeout so it appears to be re-registering.")
+ + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or "
+ + "else you have shutdown this broker and restarted it faster than the zookeeper "
+ + "timeout so it appears to be re-registering.")
}
- info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
}
def deregisterBrokerInZk(zkClient: ZkClient, id: Int) {
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 90c0b7a..fc2d5a3 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -19,17 +19,16 @@ package kafka.api.test
import org.junit.Test
import org.junit.Assert._
-
import java.lang.Integer
import java.util.{Properties, Random}
import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
import kafka.api.FetchRequestBuilder
import kafka.common.Topic
-import kafka.consumer.SimpleConsumer
import kafka.server.KafkaConfig
import kafka.integration.KafkaServerTestHarness
-import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils}
+import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils, Utils}
+import kafka.consumer.SimpleConsumer
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException}
@@ -74,8 +73,11 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
super.setUp()
// TODO: we need to migrate to new consumers when 0.9 is final
- consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "")
- consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "")
+
+ val endpoint1 = Utils.listenerListToEndPoints(configs(0).listeners).values.head
+ val endpoint2 = Utils.listenerListToEndPoints(configs(1).listeners).values.head
+ consumer1 = new SimpleConsumer("localhost", endpoint1.port, 100, 1024*1024, "")
+ consumer2 = new SimpleConsumer("localhost", endpoint2.port, 100, 1024*1024, "")
producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize)
producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
@@ -273,7 +275,6 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
server.shutdown()
server.awaitShutdown()
server.startup
-
Thread.sleep(2000)
}
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index b15237b..ee71fd5 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -25,7 +25,7 @@ import org.junit.Test
import org.junit.Assert._
import kafka.server.KafkaConfig
-import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.utils.{Utils, TestZKUtils, TestUtils}
import kafka.consumer.SimpleConsumer
import kafka.api.FetchRequestBuilder
import kafka.message.Message
@@ -55,8 +55,10 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
super.setUp()
// TODO: we need to migrate to new consumers when 0.9 is final
- consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "")
- consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "")
+ val endpoint1 = Utils.listenerListToEndPoints(configs(0).listeners).values.head
+ val endpoint2 = Utils.listenerListToEndPoints(configs(1).listeners).values.head
+ consumer1 = new SimpleConsumer("localhost", endpoint1.port, 100, 1024*1024, "")
+ consumer2 = new SimpleConsumer("localhost", endpoint2.port, 100, 1024*1024, "")
}
override def tearDown() {
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 41f334d..28f70b4 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -1,5 +1,6 @@
package other.kafka
+import kafka.cluster.SecurityProtocol
import org.I0Itec.zkclient.ZkClient
import kafka.api._
import kafka.utils.{ShutdownableThread, ZKStringSerializer}
@@ -110,7 +111,7 @@ object TestOffsetManager {
private val fetchTimer = new KafkaTimer(timer)
private val channels = mutable.Map[Int, BlockingChannel]()
- private var metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SocketTimeoutMs)
+ private var metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SecurityProtocol.PLAINTEXT, SocketTimeoutMs)
private val numErrors = new AtomicInteger(0)
@@ -156,7 +157,7 @@ object TestOffsetManager {
println("Error while querying %s:%d - shutting down query channel.".format(metadataChannel.host, metadataChannel.port))
metadataChannel.disconnect()
println("Creating new query channel.")
- metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SocketTimeoutMs)
+ metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SecurityProtocol.PLAINTEXT, SocketTimeoutMs)
}
finally {
Thread.sleep(fetchIntervalMs)
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index 4d36b8b..fc49d91 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -18,8 +18,11 @@ package unit.kafka
import java.io.{FileOutputStream, File}
import java.security.Permission
+import java.util.Properties
import kafka.Kafka
+import kafka.common.InvalidConfigException
+import kafka.server.KafkaConfig
import org.junit.{After, Before, Test}
import junit.framework.Assert._
@@ -65,14 +68,14 @@ class KafkaTest {
assertEquals(2, config2.brokerId)
// We should be also able to set completely new property
- val config3 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "port=1987"))
+ val config3 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact"))
assertEquals(1, config3.brokerId)
- assertEquals(1987, config3.port)
+ assertEquals("compact", config3.logCleanupPolicy)
// We should be also able to set several properties
- val config4 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "port=1987", "--override", "broker.id=2"))
+ val config4 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact", "--override", "broker.id=2"))
assertEquals(2, config4.brokerId)
- assertEquals(1987, config4.port)
+ assertEquals("compact", config4.logCleanupPolicy)
}
@Test(expected = classOf[ExitCalled])
@@ -99,6 +102,60 @@ class KafkaTest {
Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2"))
}
+ @Test
+ def testDuplicateListeners() {
+ val props = new Properties()
+ props.put("broker.id", "1")
+ props.put("zookeeper.connect", "localhost:2181")
+
+ // listeners with duplicate port
+ props.put("listeners", "PLAINTEXT://localhost:9091,TRACE://localhost:9091")
+
+ assert(!isValidKafkaConfig(props))
+
+ // listeners with duplicate protocol
+ props.put("listeners", "PLAINTEXT://localhost:9091,PLAINTEXT://localhost:9092")
+
+ assert(!isValidKafkaConfig(props))
+
+ // advertised listeners with duplicate port
+ props.put("advertised,listeners", "PLAINTEXT://localhost:9091,TRACE://localhost:9091")
+
+ assert(!isValidKafkaConfig(props))
+
+ }
+
+ @Test
+ def testListenerDefaults() {
+ val props = new Properties()
+ props.put("broker.id", "1")
+ props.put("zookeeper.connect", "localhost:2181")
+
+ // configuration with host and port, but no listeners
+ props.put("host.name", "myhost")
+ props.put("port", "1111")
+
+ val conf = new KafkaConfig(props)
+ assertEquals(conf.listeners, "PLAINTEXT://myhost:1111")
+
+ // configuration with advertised host and port, and no advertised listeners
+ props.put("advertised.host.name", "otherhost")
+ props.put("advertised.port", "2222")
+
+ val conf2 = new KafkaConfig(props)
+ assertEquals(conf2.advertisedListeners, "PLAINTEXT://otherhost:2222")
+
+ }
+
+ def isValidKafkaConfig(props: Properties): Boolean = {
+ try {
+ new KafkaConfig(props)
+ true
+ } catch {
+ case e: InvalidConfigException => false
+ }
+ }
+
def prepareDefaultConfig(): String = {
prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere"))
}
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 1bf2667..f0b4135 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -22,7 +22,7 @@ import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._
import junit.framework.Assert._
import kafka.utils.{ZkUtils, Utils, TestUtils}
-import kafka.cluster.Broker
+import kafka.cluster.{SecurityProtocol, Broker}
import kafka.client.ClientUtils
import kafka.server.{KafkaConfig, KafkaServer}
@@ -61,7 +61,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
val server4 = TestUtils.createServer(new KafkaConfig(configProps4))
servers ++= List(server1, server2, server3, server4)
- brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port))
+ brokers = servers.map(s => new Broker(s.config.brokerId, Utils.listenerListToEndPoints(s.config.listeners)))
// create topics first
createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
@@ -109,7 +109,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2)
- val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions",
+ val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testIncrementPartitions",
2000,0).topicsMetadata
val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1))
val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata
@@ -134,7 +134,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
- val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas",
+ val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testManualAssignmentOfReplicas",
2000,0).topicsMetadata
val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2))
val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata
@@ -158,7 +158,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5)
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6)
- val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement",
+ val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testReplicaPlacement",
2000,0).topicsMetadata
val metaDataForTopic3 = metadata.filter(p => p.topic.equals(topic3)).head
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index a1f72f8..9dbd430 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -22,7 +22,7 @@ import org.scalatest.junit.JUnitSuite
import junit.framework.Assert._
import java.nio.ByteBuffer
import kafka.message.{Message, ByteBufferMessageSet}
-import kafka.cluster.Broker
+import kafka.cluster.{BrokerEndpoint, SecurityProtocol, EndPoint, Broker}
import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError}
import kafka.utils.SystemTime
import org.apache.kafka.common.requests._
@@ -80,11 +80,15 @@ object SerializationTestUtils {
TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100)
)
- private val brokers = List(new Broker(0, "localhost", 1011), new Broker(1, "localhost", 1012), new Broker(2, "localhost", 1013))
- private val partitionMetaData0 = new PartitionMetadata(0, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 0)
- private val partitionMetaData1 = new PartitionMetadata(1, Some(brokers.head), replicas = brokers, isr = brokers.tail, errorCode = 1)
- private val partitionMetaData2 = new PartitionMetadata(2, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 2)
- private val partitionMetaData3 = new PartitionMetadata(3, Some(brokers.head), replicas = brokers, isr = brokers.tail.tail, errorCode = 3)
+ private val brokers = List(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1011, SecurityProtocol.PLAINTEXT))),
+ new Broker(1, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1012, SecurityProtocol.PLAINTEXT))),
+ new Broker(2, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1013, SecurityProtocol.PLAINTEXT))))
+ private val brokerEndpoints = brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
+
+ private val partitionMetaData0 = new PartitionMetadata(0, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, errorCode = 0)
+ private val partitionMetaData1 = new PartitionMetadata(1, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints.tail, errorCode = 1)
+ private val partitionMetaData2 = new PartitionMetadata(2, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, errorCode = 2)
+ private val partitionMetaData3 = new PartitionMetadata(3, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints.tail.tail, errorCode = 3)
private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3)
private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
@@ -94,7 +98,7 @@ object SerializationTestUtils {
val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1)
val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, isr1.toSet)),
((topic2, 0), PartitionStateInfo(leaderAndIsr2, isr2.toSet)))
- new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1, 0, "")
+ new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[BrokerEndpoint](), 0, 1, 0, "")
}
def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
@@ -148,7 +152,7 @@ object SerializationTestUtils {
}
def createTestTopicMetadataResponse: TopicMetadataResponse = {
- new TopicMetadataResponse(brokers, Seq(topicmetaData1, topicmetaData2), 1)
+ new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)).toVector, Seq(topicmetaData1, topicmetaData2), 1)
}
def createTestOffsetCommitRequestV1: OffsetCommitRequest = {
@@ -192,7 +196,7 @@ object SerializationTestUtils {
}
def createConsumerMetadataResponse: ConsumerMetadataResponse = {
- ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError, 0)
+ ConsumerMetadataResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0)
}
def createHeartbeatRequestAndHeader: HeartbeatRequestAndHeader = {
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerTest.scala
new file mode 100644
index 0000000..b9c7fd1
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerTest.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.cluster
+
+import java.nio.ByteBuffer
+
+import kafka.utils.Logging
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+
+import scala.collection.mutable
+
+class BrokerTest extends JUnit3Suite with Logging {
+
+ @Test
+ def testSerDe() = {
+
+ val endpoint = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
+ val listEndPoints = Map(SecurityProtocol.PLAINTEXT -> endpoint)
+ val origBroker = new Broker(1, listEndPoints)
+ val brokerBytes = ByteBuffer.allocate(origBroker.sizeInBytes)
+
+ origBroker.writeTo(brokerBytes)
+
+ val newBroker = Broker.readFrom(brokerBytes.flip().asInstanceOf[ByteBuffer])
+ assert(origBroker == newBroker)
+ }
+
+ @Test
+ def testHashAndEquals() = {
+ val endpoint1 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
+ val endpoint2 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
+ val endpoint3 = new EndPoint("myhost", 1111, SecurityProtocol.PLAINTEXT)
+ val endpoint4 = new EndPoint("other", 1111, SecurityProtocol.PLAINTEXT)
+ val broker1 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint1))
+ val broker2 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint2))
+ val broker3 = new Broker(2, Map(SecurityProtocol.PLAINTEXT -> endpoint3))
+ val broker4 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint4))
+
+ assert(broker1 == broker2)
+ assert(broker1 != broker3)
+ assert(broker1 != broker4)
+ assert(broker1.hashCode() == broker2.hashCode())
+ assert(broker1.hashCode() != broker3.hashCode())
+ assert(broker1.hashCode() != broker4.hashCode())
+
+ val hashmap = new mutable.HashMap[Broker, Int]()
+ hashmap.put(broker1, 1)
+ assert(hashmap.getOrElse(broker1, -1) == 1)
+ }
+
+ @Test
+ def testFromJSON() = {
+ val brokerInfoStr = "{\"version\":2," +
+ "\"host\":\"localhost\"," +
+ "\"port\":9092," +
+ "\"jmx_port\":9999," +
+ "\"timestamp\":\"1416974968782\"," +
+ "\"endpoints\":\"PLAINTEXT://localhost:9092\"}"
+ val broker = Broker.createBroker(1, brokerInfoStr)
+ assert(broker.id == 1)
+ assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host == "localhost")
+ assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port == 9092)
+ }
+
+ @Test
+ def testFromOldJSON() = {
+ val brokerInfoStr = "{\"jmx_port\":-1,\"timestamp\":\"1420485325400\",\"host\":\"172.16.8.243\",\"version\":1,\"port\":9091}"
+ val broker = Broker.createBroker(1, brokerInfoStr)
+ assert(broker.id == 1)
+ assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host == "172.16.8.243")
+ assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port == 9091)
+ }
+
+ @Test
+ def testBrokerEndpointFromURI() = {
+ var connectionString = "localhost:9092"
+ var endpoint = BrokerEndpoint.createBrokerEndPoint(1, connectionString)
+ assert(endpoint.host == "localhost")
+ assert(endpoint.port == 9092)
+ // also test for ipv6
+ connectionString = "[::1]:9092"
+ endpoint = BrokerEndpoint.createBrokerEndPoint(1, connectionString)
+ assert(endpoint.host == "::1")
+ assert(endpoint.port == 9092)
+ }
+
+ @Test
+ def testEndpointFromURI() = {
+ var connectionString = "PLAINTEXT://localhost:9092"
+ var endpoint = EndPoint.createEndPoint(connectionString)
+ assert(endpoint.host == "localhost")
+ assert(endpoint.port == 9092)
+ assert(endpoint.toString == "PLAINTEXT://localhost:9092")
+ // also test for default bind
+ connectionString = "PLAINTEXT://:9092"
+ endpoint = EndPoint.createEndPoint(connectionString)
+ assert(endpoint.host == null)
+ assert(endpoint.port == 9092)
+ assert(endpoint.toString == "PLAINTEXT://[null]:9092")
+ // also test for ipv6
+ connectionString = "PLAINTEXT://[::1]:9092"
+ endpoint = EndPoint.createEndPoint(connectionString)
+ assert(endpoint.host == "::1")
+ assert(endpoint.port == 9092)
+ assert(endpoint.toString == "PLAINTEXT://[::1]:9092")
+ }
+
+
+
+
+
+
+}
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index c0355cc..a77ab49 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -46,7 +46,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
val group = "group1"
val consumer0 = "consumer0"
val consumedOffset = 5
- val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
+ val cluster = new Cluster(configs.map(c => new Broker(c.brokerId,Utils.listenerListToEndPoints(c.listeners))))
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
0,
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 25845ab..9da447c 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -29,7 +29,7 @@ import kafka.consumer._
import kafka.serializer._
import kafka.producer.{KeyedMessage, Producer}
import kafka.utils.TestUtils._
-import kafka.utils.TestUtils
+import kafka.utils.{Utils, TestUtils}
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
@@ -39,7 +39,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
yield new KafkaConfig(props)
val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
val topic = "topic"
- val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
+ val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, Utils.listenerListToEndPoints(c.listeners))))
val shutdown = ZookeeperConsumerConnector.shutdownCommand
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index ef4c9ae..334242e 100644
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -18,6 +18,8 @@
package kafka.integration
import java.util.Arrays
+import kafka.cluster.SecurityProtocol
+
import scala.collection.mutable.Buffer
import kafka.server._
import kafka.utils.{Utils, TestUtils}
@@ -38,7 +40,10 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
def serverForId(id: Int) = servers.find(s => s.config.brokerId == id)
- def bootstrapUrl = configs.map(c => c.hostName + ":" + c.port).mkString(",")
+ def bootstrapUrl = configs.map(c => {
+ val endpoint = Utils.listenerListToEndPoints(c.listeners)(SecurityProtocol.PLAINTEXT)
+ endpoint.host + ":" + endpoint.port
+ }).mkString(",")
override def setUp() {
super.setUp
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 35dc071..66aedda 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -22,8 +22,8 @@ import kafka.zk.ZooKeeperTestHarness
import kafka.admin.AdminUtils
import java.nio.ByteBuffer
import junit.framework.Assert._
-import kafka.cluster.Broker
-import kafka.utils.TestUtils
+import kafka.cluster.{SecurityProtocol, Broker}
+import kafka.utils.{Utils, TestUtils}
import kafka.utils.TestUtils._
import kafka.server.{KafkaServer, KafkaConfig}
import kafka.api.TopicMetadataRequest
@@ -34,7 +34,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(1)
val configs = props.map(p => new KafkaConfig(p))
private var server1: KafkaServer = null
- val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port))
+ val brokerEndPoints = configs.map(c => new Broker(c.brokerId,Utils.listenerListToEndPoints(c.listeners)).getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
override def setUp() {
super.setUp()
@@ -67,7 +67,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
val topic = "test"
createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
- var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata",
+ var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
2000,0).topicsMetadata
assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
@@ -87,7 +87,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
// issue metadata request with empty list of topics
- var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokers, "TopicMetadataTest-testGetAllTopicMetadata",
+ var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testGetAllTopicMetadata",
2000, 0).topicsMetadata
assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
assertEquals(2, topicsMetadata.size)
@@ -106,7 +106,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
def testAutoCreateTopic {
// auto create topic
val topic = "testAutoCreateTopic"
- var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testAutoCreateTopic",
+ var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
2000,0).topicsMetadata
assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode)
assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
@@ -118,7 +118,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0)
// retry the metadata for the auto created topic
- topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata",
+ topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
2000,0).topicsMetadata
assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index c2dd8eb..9b1df44 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -37,7 +37,7 @@ class LogTest extends JUnitSuite {
@Before
def setUp() {
logDir = TestUtils.tempDir()
- val props = TestUtils.createBrokerConfig(0, -1)
+ val props = TestUtils.createBrokerConfig(0, 1)
config = new KafkaConfig(props)
}
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 78b431f..be553e3 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -19,6 +19,8 @@ package kafka.network;
import java.net._
import java.io._
+import kafka.cluster.SecurityProtocol.SecurityProtocol
+import kafka.cluster.{SecurityProtocol, EndPoint}
import org.junit._
import org.scalatest.junit.JUnitSuite
import java.util.Random
@@ -34,9 +36,12 @@ import scala.collection.Map
class SocketServerTest extends JUnitSuite {
+ val ports = kafka.utils.TestUtils.choosePorts(2)
+ val plaintextPort = ports.head
+ val tracePort = ports.last
val server: SocketServer = new SocketServer(0,
- host = null,
- port = kafka.utils.TestUtils.choosePort,
+ Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, plaintextPort, SecurityProtocol.PLAINTEXT),
+ SecurityProtocol.TRACE -> EndPoint(null, tracePort, SecurityProtocol.TRACE)),
numProcessorThreads = 1,
maxQueuedRequests = 50,
sendBufferSize = 300000,
@@ -73,7 +78,10 @@ class SocketServerTest extends JUnitSuite {
channel.sendResponse(new RequestChannel.Response(request.processor, request, send))
}
- def connect(s:SocketServer = server) = new Socket("localhost", s.port)
+
+ def connect(s:SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = {
+ new Socket("localhost", server.endpoints.get(protocol).get.port)
+ }
@After
def cleanup() {
@@ -81,7 +89,8 @@ class SocketServerTest extends JUnitSuite {
}
@Test
def simpleRequest() {
- val socket = connect()
+ val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+ val traceSocket = connect(protocol = SecurityProtocol.TRACE)
val correlationId = -1
val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
@@ -95,9 +104,15 @@ class SocketServerTest extends JUnitSuite {
val serializedBytes = new Array[Byte](byteBuffer.remaining)
byteBuffer.get(serializedBytes)
- sendRequest(socket, 0, serializedBytes)
+ // Test PLAINTEXT socket
+ sendRequest(plainSocket, 0, serializedBytes)
processRequest(server.requestChannel)
- assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq)
+ assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq)
+
+ // Test TRACE socket
+ sendRequest(traceSocket, 0, serializedBytes)
+ processRequest(server.requestChannel)
+ assertEquals(serializedBytes.toSeq, receiveResponse(traceSocket).toSeq)
}
@Test(expected = classOf[IOException])
@@ -129,18 +144,31 @@ class SocketServerTest extends JUnitSuite {
"Socket key should be available for reads")
}
- @Test(expected = classOf[IOException])
+ @Test
def testSocketsCloseOnShutdown() {
// open a connection
- val socket = connect()
+ val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+ val traceSocket = connect(protocol = SecurityProtocol.TRACE)
val bytes = new Array[Byte](40)
// send a request first to make sure the connection has been picked up by the socket server
- sendRequest(socket, 0, bytes)
+ sendRequest(plainSocket, 0, bytes)
+ sendRequest(traceSocket, 0, bytes)
processRequest(server.requestChannel)
// then shutdown the server
server.shutdown()
// doing a subsequent send should throw an exception as the connection should be closed.
- sendRequest(socket, 0, bytes)
+ try {
+ sendRequest(plainSocket, 0, bytes)
+ fail("expected exception when writing to closed plain socket")
+ } catch {
+ case e: IOException => // expected
+ }
+ try {
+ sendRequest(traceSocket, 0, bytes)
+ fail("expected exception when writing to closed trace socket")
+ } catch {
+ case e: IOException => // expected
+ }
}
@Test
@@ -158,8 +186,7 @@ class SocketServerTest extends JUnitSuite {
val overrideNum = 6
val overrides: Map[String, Int] = Map("localhost" -> overrideNum)
val overrideServer: SocketServer = new SocketServer(0,
- host = null,
- port = kafka.utils.TestUtils.choosePort,
+ Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, kafka.utils.TestUtils.choosePort, SecurityProtocol.PLAINTEXT)),
numProcessorThreads = 1,
maxQueuedRequests = 50,
sendBufferSize = 300000,
@@ -177,4 +204,5 @@ class SocketServerTest extends JUnitSuite {
assertEquals(-1, conn.getInputStream.read())
overrideServer.shutdown()
}
+
}
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 1db6ac3..450aa59 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -23,7 +23,7 @@ import junit.framework.Assert._
import org.easymock.EasyMock
import org.junit.Test
import kafka.api._
-import kafka.cluster.Broker
+import kafka.cluster.{BrokerEndpoint, Broker}
import kafka.common._
import kafka.message._
import kafka.producer.async._
@@ -163,8 +163,8 @@ class AsyncProducerTest extends JUnit3Suite {
val props = new Properties()
props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
- val broker1 = new Broker(0, "localhost", 9092)
- val broker2 = new Broker(1, "localhost", 9093)
+ val broker1 = new BrokerEndpoint(0, "localhost", 9092)
+ val broker2 = new BrokerEndpoint(1, "localhost", 9093)
// form expected partitions metadata
val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2))
@@ -467,7 +467,7 @@ class AsyncProducerTest extends JUnit3Suite {
}
private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
- val broker1 = new Broker(brokerId, brokerHost, brokerPort)
+ val broker1 = new BrokerEndpoint(brokerId, brokerHost, brokerPort)
new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1))))
}
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index d60d8e0..fe456e6 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -21,6 +21,7 @@ import java.net.SocketTimeoutException
import java.util.Properties
import junit.framework.Assert
import kafka.admin.AdminUtils
+import kafka.cluster.SecurityProtocol
import kafka.integration.KafkaServerTestHarness
import kafka.message._
import kafka.server.KafkaConfig
@@ -39,7 +40,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testReachableServer() {
val server = servers.head
- val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+ val port = server.socketServer.endpoints.get(SecurityProtocol.PLAINTEXT).get.port
+ val props = TestUtils.getSyncProducerConfig(port)
val producer = new SyncProducer(new SyncProducerConfig(props))
val firstStart = SystemTime.milliseconds
@@ -74,7 +76,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testEmptyProduceRequest() {
val server = servers.head
- val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+ val port = server.socketServer.endpoints(SecurityProtocol.PLAINTEXT).port
+ val props = TestUtils.getSyncProducerConfig(port)
val correlationId = 0
val clientId = SyncProducerConfig.DefaultClientId
@@ -91,7 +94,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testMessageSizeTooLarge() {
val server = servers.head
- val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+ val port = server.socketServer.endpoints.get(SecurityProtocol.PLAINTEXT).get.port
+ val props = TestUtils.getSyncProducerConfig(port)
val producer = new SyncProducer(new SyncProducerConfig(props))
TestUtils.createTopic(zkClient, "test", numPartitions = 1, replicationFactor = 1, servers = servers)
@@ -118,8 +122,9 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testMessageSizeTooLargeWithAckZero() {
val server = servers.head
+ val port = server.socketServer.endpoints.get(SecurityProtocol.PLAINTEXT).get.port
+ val props = TestUtils.getSyncProducerConfig(port)
- val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
props.put("request.required.acks", "0")
val producer = new SyncProducer(new SyncProducerConfig(props))
@@ -145,7 +150,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testProduceCorrectlyReceivesResponse() {
val server = servers.head
- val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+ val port = server.socketServer.endpoints.get(SecurityProtocol.PLAINTEXT).get.port
+ val props = TestUtils.getSyncProducerConfig(port)
val producer = new SyncProducer(new SyncProducerConfig(props))
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
@@ -191,7 +197,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val timeoutMs = 500
val server = servers.head
- val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+ val port = server.socketServer.endpoints.get(SecurityProtocol.PLAINTEXT).get.port
+ val props = TestUtils.getSyncProducerConfig(port)
val producer = new SyncProducer(new SyncProducerConfig(props))
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
@@ -217,7 +224,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testProduceRequestWithNoResponse() {
val server = servers.head
- val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+ val port = server.socketServer.endpoints.get(SecurityProtocol.PLAINTEXT).get.port
+ val props = TestUtils.getSyncProducerConfig(port)
val correlationId = 0
val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
@@ -232,8 +240,9 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
def testNotEnoughReplicas() {
val topicName = "minisrtest"
val server = servers.head
+ val port = server.socketServer.endpoints.get(SecurityProtocol.PLAINTEXT).get.port
+ val props = TestUtils.getSyncProducerConfig(port)
- val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
props.put("request.required.acks", "-1")
val producer = new SyncProducer(new SyncProducerConfig(props))
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
index f0c4a56..d56503f 100644
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -17,6 +17,7 @@
package kafka.server
+import kafka.cluster.SecurityProtocol
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import junit.framework.Assert._
@@ -31,9 +32,8 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
override def setUp() {
super.setUp()
val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
- props.put("advertised.host.name", advertisedHostName)
- props.put("advertised.port", advertisedPort.toString)
-
+ props.put("advertised.listeners", SecurityProtocol.PLAINTEXT.toString+"://"+advertisedHostName+":"+advertisedPort.toString)
+
server = TestUtils.createServer(new KafkaConfig(props))
}
@@ -45,8 +45,9 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
def testBrokerAdvertiseToZK {
val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId)
- assertEquals(advertisedHostName, brokerInfo.get.host)
- assertEquals(advertisedPort, brokerInfo.get.port)
+ val endpoint = brokerInfo.get.endPoints.get(SecurityProtocol.PLAINTEXT).get
+ assertEquals(advertisedHostName, endpoint.host)
+ assertEquals(advertisedPort, endpoint.port)
}
}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 82dce80..32ef5f5 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -17,12 +17,13 @@
package kafka.server
+import kafka.cluster.SecurityProtocol
import org.junit.Test
import junit.framework.Assert._
import org.scalatest.junit.JUnit3Suite
-import kafka.utils.TestUtils
import kafka.message.GZIPCompressionCodec
import kafka.message.NoCompressionCodec
+import kafka.utils.{Utils, TestUtils}
class KafkaConfigTest extends JUnit3Suite {
@@ -93,12 +94,13 @@ class KafkaConfigTest extends JUnit3Suite {
val hostName = "fake-host"
val props = TestUtils.createBrokerConfig(0, port)
- props.put("host.name", hostName)
+ props.put("listeners", "PLAINTEXT://"+hostName+":"+port)
val serverConfig = new KafkaConfig(props)
-
- assertEquals(serverConfig.advertisedHostName, hostName)
- assertEquals(serverConfig.advertisedPort, port)
+ val endpoints = Utils.listenerListToEndPoints(serverConfig.advertisedListeners)
+ val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get
+ assertEquals(endpoint.host, hostName)
+ assertEquals(endpoint.port, port)
}
@Test
@@ -108,13 +110,14 @@ class KafkaConfigTest extends JUnit3Suite {
val advertisedPort = 1234
val props = TestUtils.createBrokerConfig(0, port)
- props.put("advertised.host.name", advertisedHostName)
- props.put("advertised.port", advertisedPort.toString)
+ props.put("advertised.listeners", "PLAINTEXT://"+advertisedHostName+":"+advertisedPort.toString)
val serverConfig = new KafkaConfig(props)
+ val endpoints = Utils.listenerListToEndPoints(serverConfig.advertisedListeners)
+ val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get
- assertEquals(serverConfig.advertisedHostName, advertisedHostName)
- assertEquals(serverConfig.advertisedPort, advertisedPort)
+ assertEquals(endpoint.host, advertisedHostName)
+ assertEquals(endpoint.port, advertisedPort)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index c2ba07c..aecf1f8 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -23,7 +23,7 @@ import kafka.utils.TestUtils._
import junit.framework.Assert._
import kafka.utils.{ZkUtils, Utils, TestUtils}
import kafka.controller.{ControllerContext, LeaderIsrAndControllerEpoch, ControllerChannelManager}
-import kafka.cluster.Broker
+import kafka.cluster.{SecurityProtocol, Broker}
import kafka.common.ErrorMapping
import kafka.api._
@@ -118,7 +118,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
// start another controller
val controllerId = 2
val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
- val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
+ val brokers = servers.map(s => new Broker(s.config.brokerId, Utils.listenerListToEndPoints(s.config.listeners)))
+ val brokerEndPoints = brokers.map(b => b.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
val controllerContext = new ControllerContext(zkClient, 6000)
controllerContext.liveBrokers = brokers.toSet
val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig)
@@ -128,7 +129,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
leaderAndIsr.put((topic, partitionId),
new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2))
val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, Set(0,1))).toMap
- val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId,
+ val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokerEndPoints.toSet, controllerId,
staleControllerEpoch, 0, "")
controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index c06ee75..0476a59 100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -197,7 +197,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
val props = new Properties
props.put("broker.id", nodeId.toString)
- props.put("port", port.toString)
+ props.put("listeners", "PLAINTEXT://localhost:" + port.toString)
props.put("log.dir", getLogDir.getAbsolutePath)
props.put("log.flush.interval.messages", "1")
props.put("enable.zookeeper", "false")
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 54755e8..bfec0d0 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -34,7 +34,7 @@ import kafka.server._
import kafka.producer._
import kafka.message._
import kafka.api._
-import kafka.cluster.Broker
+import kafka.cluster.{SecurityProtocol, Broker}
import kafka.consumer.{KafkaStream, ConsumerConfig}
import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
import kafka.common.TopicAndPartition
@@ -146,7 +146,10 @@ object TestUtils extends Logging {
}
def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
- configs.map(c => formatAddress(c.hostName, c.port)).mkString(",")
+ configs.map(c => {
+ val endpoint = Utils.listenerListToEndPoints(c.listeners).get(SecurityProtocol.PLAINTEXT).get
+ formatAddress(endpoint.host, endpoint.port)
+ }).mkString(",")
}
/**
@@ -156,8 +159,7 @@ object TestUtils extends Logging {
enableControlledShutdown: Boolean = true): Properties = {
val props = new Properties
if (nodeId >= 0) props.put("broker.id", nodeId.toString)
- props.put("host.name", "localhost")
- props.put("port", port.toString)
+ props.put("listeners", "PLAINTEXT://localhost:"+port.toString)
props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
props.put("zookeeper.connect", TestZKUtils.zookeeperConnect)
props.put("replica.socket.timeout.ms", "1500")
@@ -446,13 +448,13 @@ object TestUtils extends Logging {
}
def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
- val brokers = ids.map(id => new Broker(id, "localhost", 6667))
- brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port, 6000, jmxPort = -1))
+ val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT))
+ brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, "localhost", 6667, b.endPoints, 6000, jmxPort = -1))
brokers
}
def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
- val brokers = ids.map(id => new Broker(id, "localhost", 6667))
+ val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT))
brokers.foreach(b => ZkUtils.deletePath(zkClient, ZkUtils.BrokerIdsPath + "/" + b))
brokers
}
diff --git a/system_test/replication_testsuite/testcase_1/testcase_1_properties.json b/system_test/replication_testsuite/testcase_1/testcase_1_properties.json
index 0c6d7a3..680213f 100644
--- a/system_test/replication_testsuite/testcase_1/testcase_1_properties.json
+++ b/system_test/replication_testsuite/testcase_1/testcase_1_properties.json
@@ -32,7 +32,7 @@
"entity_id": "1",
"port": "9091",
"broker.id": "1",
- "log.segment.bytes": "20480",
+ "log.segment.bytes": "10000000",
"log.dir": "/tmp/kafka_server_1_logs",
"log_filename": "kafka_server_9091.log",
"config_filename": "kafka_server_9091.properties"
@@ -41,7 +41,7 @@
"entity_id": "2",
"port": "9092",
"broker.id": "2",
- "log.segment.bytes": "20480",
+ "log.segment.bytes": "10000000",
"log.dir": "/tmp/kafka_server_2_logs",
"log_filename": "kafka_server_9092.log",
"config_filename": "kafka_server_9092.properties"
@@ -50,7 +50,7 @@
"entity_id": "3",
"port": "9093",
"broker.id": "3",
- "log.segment.bytes": "20480",
+ "log.segment.bytes": "10000000",
"log.dir": "/tmp/kafka_server_3_logs",
"log_filename": "kafka_server_9093.log",
"config_filename": "kafka_server_9093.properties"
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index 41d511c..a9b73f7 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -436,6 +436,7 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv
addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5"
addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter"
addedCSVConfig["kafka.csv.metrics.reporter.enabled"] = "true"
+ addedCSVConfig["listeners"] = "PLAINTEXT://localhost:"+tcCfg["port"]
if brokerVersion == "0.7":
addedCSVConfig["brokerid"] = tcCfg["brokerid"]