diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 122375c..74539dc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -166,6 +166,10 @@ public class ProducerConfig extends AbstractConfig { public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the Serializer interface."; + /** security.protocol */ + public static final String SECURITY_PROTOCOL = "security.protocol"; + private static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Currently only PLAINTEXT is supported. SSL and Kerberos are planned for the near future"; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) @@ -214,7 +218,13 @@ public class ProducerConfig extends AbstractConfig { Importance.LOW, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) - .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC); + .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) + .define(SECURITY_PROTOCOL, + Type.STRING, + "PLAINTEXT", + in("PLAINTEXT"), + Importance.MEDIUM, + SECURITY_PROTOCOL_DOC); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java new file mode 100644 index 0000000..e3a6ebc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.protocol; + +/** + * This class contains the different Kafka versions. + * Right now, we use them for upgrades - users can configure the version of the API brokers will use to communicate between themselves. + * This is only for intra-broker communications - when communicating with clients, the client decides on the API version. + * + * Note that ORDER MATTERS in the enum. + * We consider version A as newer than B if it appears later in the list of constants here. + * If you add new versions, add them in the correct chronological release order. + */ +public enum ApiVersion { + KAFKA_0820("0.8.2.0"), + KAFKA_0830("0.8.3.0"); + + private final String version; + + public boolean onOrAfter(ApiVersion other) { + return compareTo(other) >= 0; + } + + private ApiVersion(final String version) { + this.version = version; + } + + /* Parse user readable version number. This assumes the convention of 0.8.2.0, 0.8.3.0, 0.9.0.0, etc. */ + public static ApiVersion parseConfig(String version) { + String[] vals = version.split("\\."); + StringBuilder parsed = new StringBuilder(); + parsed.append("KAFKA_"); + + for (String v: vals) { + parsed.append(v); + } + return ApiVersion.valueOf(parsed.toString()); + } + + public static ApiVersion getLatestVersion() { + ApiVersion[] values = ApiVersion.values(); + return values[values.length - 1]; + } + + @Override + public String toString() { + return version; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java new file mode 100644 index 0000000..61d1a75 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.protocol; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public enum SecurityProtocol { + /** Currently identical to PLAINTEXT and used for testing. Our plan is to add instrumentation in the future. */ + TRACE(0, "TRACE"), + /** Un-authenticated, non-encrypted channel */ + PLAINTEXT(1, "PLAINTEXT"); + + private static Map codeToSecurityProtocol = new HashMap(); + private static List names = new ArrayList(); + + static { + for (SecurityProtocol proto: SecurityProtocol.values()) { + codeToSecurityProtocol.put(proto.id, proto); + names.add(proto.name); + } + } + + /** The permanent and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol */ + public final short id; + + /** A name of the security protocol. This may be used by client configuration. */ + public final String name; + + private SecurityProtocol(int id, String name) { + this.id = (short) id; + this.name = name; + } + + public static String getName(int id) { + return codeToSecurityProtocol.get(id).name; + } + + public static List getNames() { + return names; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 69530c1..e6823d9 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -27,7 +27,9 @@ import org.apache.kafka.common.KafkaException; public class Utils { - private static final Pattern HOST_PORT_PATTERN = Pattern.compile("\\[?(.+?)\\]?:(\\d+)"); + // This matches URIs of formats: host:port and protocol:\\host:port + // IPv6 is supported with [ip] pattern + private static final Pattern HOST_PORT_PATTERN = Pattern.compile(".*?\\[?([0-9a-z\\-.:]*)\\]?:([0-9]+)"); public static final String NL = System.getProperty("line.separator"); diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index 4c2ea34..dc69d14 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -31,9 +31,9 @@ public class UtilsTest { @Test public void testGetHost() { assertEquals("127.0.0.1", getHost("127.0.0.1:8000")); - assertEquals("mydomain.com", getHost("mydomain.com:8080")); + assertEquals("mydomain.com", getHost("PLAINTEXT://mydomain.com:8080")); assertEquals("::1", getHost("[::1]:1234")); - assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678")); + assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678")); } @Test diff --git a/config/server.properties b/config/server.properties index 1614260..80ee2fc 100644 --- a/config/server.properties +++ b/config/server.properties @@ -21,8 +21,10 @@ broker.id=0 ############################# Socket Server Settings ############################# +listeners=PLAINTEXT://:9092 + # The port the socket server listens on -port=9092 +#port=9092 # Hostname the broker will bind to. If not set, the server will bind to all interfaces #host.name=localhost diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index b700110..6b78d01 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -18,14 +18,15 @@ package kafka.admin import kafka.common._ -import kafka.cluster.Broker +import kafka.cluster.SecurityProtocol.SecurityProtocol +import kafka.cluster.{BrokerEndpoint, Broker, SecurityProtocol} + import kafka.log.LogConfig import kafka.utils._ import kafka.api.{TopicMetadata, PartitionMetadata} import java.util.Random import java.util.Properties -import scala.Some import scala.Predef._ import scala.collection._ import mutable.ListBuffer @@ -341,7 +342,9 @@ object AdminUtils extends Logging { topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo)) } - private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker]): TopicMetadata = { + + + private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): TopicMetadata = { if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) { val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) @@ -352,22 +355,22 @@ object AdminUtils extends Logging { val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader) - var leaderInfo: Option[Broker] = None - var replicaInfo: Seq[Broker] = Nil - var isrInfo: Seq[Broker] = Nil + var leaderInfo: Option[BrokerEndpoint] = None + var replicaInfo: Seq[BrokerEndpoint] = Nil + var isrInfo: Seq[BrokerEndpoint] = Nil try { leaderInfo = leader match { case Some(l) => try { - Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) + Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head.getBrokerEndPoint(protocol)) } catch { case e: Throwable => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e) } case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) } try { - replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)) - isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas) + replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)).map(_.getBrokerEndPoint(protocol)) + isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas).map(_.getBrokerEndPoint(protocol)) } catch { case e: Throwable => throw new ReplicaNotAvailableException(e) } diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index f400b71..20126fb 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -196,9 +196,7 @@ object TopicCommand { } } } - - def formatBroker(broker: Broker) = broker.id + " (" + formatAddress(broker.host, broker.port) + ")" - + def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = { val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*""")) require(configsToBeAdded.forall(config => config.length == 2), diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala index 24aaf95..a3587e4 100644 --- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala @@ -18,18 +18,18 @@ package kafka.api import java.nio.ByteBuffer -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint import kafka.common.ErrorMapping object ConsumerMetadataResponse { val CurrentVersion = 0 - private val NoBrokerOpt = Some(Broker(id = -1, host = "", port = -1)) + private val NoBrokerOpt = Some(BrokerEndpoint(id = -1, host = "", port = -1)) def readFrom(buffer: ByteBuffer) = { val correlationId = buffer.getInt val errorCode = buffer.getShort - val broker = Broker.readFrom(buffer) + val broker = BrokerEndpoint.readFrom(buffer) val coordinatorOpt = if (errorCode == ErrorMapping.NoError) Some(broker) else @@ -40,7 +40,7 @@ object ConsumerMetadataResponse { } -case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, correlationId: Int) +case class ConsumerMetadataResponse (coordinatorOpt: Option[BrokerEndpoint], errorCode: Short, correlationId: Int) extends RequestOrResponse() { def sizeInBytes = diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 4ff7e8f..bf93632 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -21,7 +21,7 @@ package kafka.api import java.nio._ import kafka.utils._ import kafka.api.ApiUtils._ -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint import kafka.controller.LeaderIsrAndControllerEpoch import kafka.network.{BoundedByteBufferSend, RequestChannel} import kafka.common.ErrorMapping @@ -120,9 +120,9 @@ object LeaderAndIsrRequest { } val leadersCount = buffer.getInt - var leaders = Set[Broker]() + var leaders = Set[BrokerEndpoint]() for (i <- 0 until leadersCount) - leaders += Broker.readFrom(buffer) + leaders += BrokerEndpoint.readFrom(buffer) new LeaderAndIsrRequest(versionId, correlationId, clientId, controllerId, controllerEpoch, partitionStateInfos.toMap, leaders) } @@ -134,10 +134,10 @@ case class LeaderAndIsrRequest (versionId: Short, controllerId: Int, controllerEpoch: Int, partitionStateInfos: Map[(String, Int), PartitionStateInfo], - leaders: Set[Broker]) + leaders: Set[BrokerEndpoint]) extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) { - def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[Broker], controllerId: Int, + def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[BrokerEndpoint], controllerId: Int, controllerEpoch: Int, correlationId: Int, clientId: String) = { this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId, controllerId, controllerEpoch, partitionStateInfos, leaders) diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index 0190076..76f8bc9 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -17,18 +17,17 @@ package kafka.api -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint import java.nio.ByteBuffer import kafka.api.ApiUtils._ import kafka.utils.Logging import kafka.common._ -import org.apache.kafka.common.utils.Utils._ object TopicMetadata { val NoLeaderNodeId = -1 - def readFrom(buffer: ByteBuffer, brokers: Map[Int, Broker]): TopicMetadata = { + def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndpoint]): TopicMetadata = { val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) val topic = readShortString(buffer) val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue)) @@ -89,7 +88,7 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat object PartitionMetadata { - def readFrom(buffer: ByteBuffer, brokers: Map[Int, Broker]): PartitionMetadata = { + def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndpoint]): PartitionMetadata = { val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */ val leaderId = buffer.getInt @@ -110,10 +109,11 @@ object PartitionMetadata { } case class PartitionMetadata(partitionId: Int, - val leader: Option[Broker], - replicas: Seq[Broker], - isr: Seq[Broker] = Seq.empty, + val leader: Option[BrokerEndpoint], + replicas: Seq[BrokerEndpoint], + isr: Seq[BrokerEndpoint] = Seq.empty, errorCode: Short = ErrorMapping.NoError) extends Logging { + def sizeInBytes: Int = { 2 /* error code */ + 4 /* partition id */ + @@ -142,14 +142,13 @@ case class PartitionMetadata(partitionId: Int, override def toString(): String = { val partitionMetadataString = new StringBuilder partitionMetadataString.append("\tpartition " + partitionId) - partitionMetadataString.append("\tleader: " + (if(leader.isDefined) formatBroker(leader.get) else "none")) - partitionMetadataString.append("\treplicas: " + replicas.map(formatBroker).mkString(",")) - partitionMetadataString.append("\tisr: " + isr.map(formatBroker).mkString(",")) + partitionMetadataString.append("\tleader: " + (if(leader.isDefined) leader.get.toString else "none")) + partitionMetadataString.append("\treplicas: " + replicas.mkString(",")) + partitionMetadataString.append("\tisr: " + isr.mkString(",")) partitionMetadataString.append("\tisUnderReplicated: %s".format(if(isr.size < replicas.size) "true" else "false")) partitionMetadataString.toString() } - private def formatBroker(broker: Broker) = broker.id + " (" + formatAddress(broker.host, broker.port) + ")" } diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala index 92ac4e6..4de566b 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala @@ -17,7 +17,7 @@ package kafka.api -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint import java.nio.ByteBuffer object TopicMetadataResponse { @@ -25,7 +25,7 @@ object TopicMetadataResponse { def readFrom(buffer: ByteBuffer): TopicMetadataResponse = { val correlationId = buffer.getInt val brokerCount = buffer.getInt - val brokers = (0 until brokerCount).map(_ => Broker.readFrom(buffer)) + val brokers = (0 until brokerCount).map(_ => BrokerEndpoint.readFrom(buffer)) val brokerMap = brokers.map(b => (b.id, b)).toMap val topicCount = buffer.getInt val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer, brokerMap)) @@ -33,14 +33,16 @@ object TopicMetadataResponse { } } -case class TopicMetadataResponse(brokers: Seq[Broker], +case class TopicMetadataResponse(brokers: Seq[BrokerEndpoint], topicsMetadata: Seq[TopicMetadata], correlationId: Int) extends RequestOrResponse() { + val sizeInBytes: Int = { 4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum } + def writeTo(buffer: ByteBuffer) { buffer.putInt(correlationId) /* brokers */ diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala index 530982e..cef3ec9 100644 --- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala @@ -17,15 +17,15 @@ package kafka.api import java.nio.ByteBuffer +import kafka.cluster.{SecurityProtocol, BrokerEndpoint, Broker} import kafka.api.ApiUtils._ -import kafka.cluster.Broker -import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.common.{KafkaException, ErrorMapping, TopicAndPartition} import kafka.network.{BoundedByteBufferSend, RequestChannel} import kafka.network.RequestChannel.Response import collection.Set object UpdateMetadataRequest { - val CurrentVersion = 0.shortValue + val CurrentVersion = 1.shortValue val IsInit: Boolean = true val NotInit: Boolean = false val DefaultAckTimeout: Int = 1000 @@ -48,7 +48,14 @@ object UpdateMetadataRequest { } val numAliveBrokers = buffer.getInt - val aliveBrokers = for(i <- 0 until numAliveBrokers) yield Broker.readFrom(buffer) + + val aliveBrokers = versionId match { + case 0 => for(i <- 0 until numAliveBrokers) yield new Broker(BrokerEndpoint.readFrom(buffer),SecurityProtocol.PLAINTEXT) + case 1 => for(i <- 0 until numAliveBrokers) yield Broker.readFrom(buffer) + case v => throw new KafkaException( "Version " + v.toString + " is invalid for UpdateMetadataRequest. Valid versions are 0 or 1.") + } + + new UpdateMetadataRequest(versionId, correlationId, clientId, controllerId, controllerEpoch, partitionStateInfos.toMap, aliveBrokers.toSet) } @@ -82,7 +89,14 @@ case class UpdateMetadataRequest (versionId: Short, value.writeTo(buffer) } buffer.putInt(aliveBrokers.size) - aliveBrokers.foreach(_.writeTo(buffer)) + + versionId match { + case 0 => aliveBrokers.foreach(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).writeTo(buffer)) + case 1 => aliveBrokers.foreach(_.writeTo(buffer)) + case v => throw new KafkaException( "Version " + v.toString + " is invalid for UpdateMetadataRequest. Valid versions are 0 or 1.") + } + + } def sizeInBytes(): Int = { diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index ebba87f..614d4b8 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -18,17 +18,17 @@ import scala.collection._ import kafka.cluster._ +import kafka.cluster.SecurityProtocol.SecurityProtocol import kafka.api._ import kafka.producer._ import kafka.common.{ErrorMapping, KafkaException} import kafka.utils.{Utils, Logging} import java.util.Properties import util.Random - import kafka.network.BlockingChannel - import kafka.utils.ZkUtils._ - import org.I0Itec.zkclient.ZkClient - import java.io.IOException -import org.apache.kafka.common.utils.Utils.{getHost, getPort} +import kafka.network.BlockingChannel +import kafka.utils.ZkUtils._ +import org.I0Itec.zkclient.ZkClient +import java.io.IOException /** * Helper functions common to clients (producer, consumer, or admin) @@ -42,7 +42,7 @@ object ClientUtils extends Logging{ * @param producerConfig The producer's config * @return topic metadata response */ - def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { + def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndpoint], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { var fetchMetaDataSucceeded: Boolean = false var i: Int = 0 val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq) @@ -83,7 +83,7 @@ object ClientUtils extends Logging{ * @param clientId The client's identifier * @return topic metadata response */ - def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int, + def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndpoint], clientId: String, timeoutMs: Int, correlationId: Int = 0): TopicMetadataResponse = { val props = new Properties() props.put("metadata.broker.list", brokers.map(_.connectionString).mkString(",")) @@ -96,22 +96,22 @@ object ClientUtils extends Logging{ /** * Parse a list of broker urls in the form host1:port1, host2:port2, ... */ - def parseBrokerList(brokerListStr: String): Seq[Broker] = { + def parseBrokerList(brokerListStr: String): Seq[BrokerEndpoint] = { val brokersStr = Utils.parseCsvList(brokerListStr) brokersStr.zipWithIndex.map { case (address, brokerId) => - new Broker(brokerId, getHost(address), getPort(address)) + BrokerEndpoint.createBrokerEndPoint(brokerId, address) } } /** * Creates a blocking channel to a random broker */ - def channelToAnyBroker(zkClient: ZkClient, socketTimeoutMs: Int = 3000) : BlockingChannel = { + def channelToAnyBroker(zkClient: ZkClient, protocolType: SecurityProtocol, socketTimeoutMs: Int = 3000) : BlockingChannel = { var channel: BlockingChannel = null var connected = false while (!connected) { - val allBrokers = getAllBrokersInCluster(zkClient) + val allBrokers = getAllBrokerEndPointsForChannel(zkClient, protocolType) Random.shuffle(allBrokers).find { broker => trace("Connecting to broker %s:%d.".format(broker.host, broker.port)) try { @@ -136,19 +136,19 @@ object ClientUtils extends Logging{ /** * Creates a blocking channel to the offset manager of the given group */ - def channelToOffsetManager(group: String, zkClient: ZkClient, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = { - var queryChannel = channelToAnyBroker(zkClient) + def channelToOffsetManager(group: String, zkClient: ZkClient, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000, protocolType: SecurityProtocol = SecurityProtocol.PLAINTEXT) = { + var queryChannel = channelToAnyBroker(zkClient, protocolType) var offsetManagerChannelOpt: Option[BlockingChannel] = None while (!offsetManagerChannelOpt.isDefined) { - var coordinatorOpt: Option[Broker] = None + var coordinatorOpt: Option[BrokerEndpoint] = None while (!coordinatorOpt.isDefined) { try { if (!queryChannel.isConnected) - queryChannel = channelToAnyBroker(zkClient) + queryChannel = channelToAnyBroker(zkClient, protocolType) debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group)) queryChannel.send(ConsumerMetadataRequest(group)) val response = queryChannel.receive() diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 0060add..7647d85 100644 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -18,17 +18,39 @@ package kafka.cluster import kafka.utils.Utils._ -import kafka.utils.Json -import kafka.api.ApiUtils._ +import kafka.utils.{Utils, Json} import java.nio.ByteBuffer -import kafka.common.{KafkaException, BrokerNotAvailableException} -import org.apache.kafka.common.utils.Utils._ +import kafka.common.{BrokerEndPointNotAvailableException, KafkaException, BrokerNotAvailableException} +import kafka.cluster.SecurityProtocol._ /** - * A Kafka broker + * A Kafka broker. + * A broker has an id, a host and a collection of end-points. + * Each end-point is (port,protocolType). + * Currently the only protocol type is PlainText but we will add SSL and Kerberos in the future. */ object Broker { + /** + * Create a broker object from id and JSON string. + * @param id + * @param brokerInfoString + * + * Version 1 JSON schema for a broker is: + * {"version":1, + * "host":"localhost", + * "port":9092 + * "jmx_port":9999, + * "timestamp":"2233345666" } + * + * The current JSON schema for a broker is: + * {"version":2, + * "host","localhost", + * "port",9092 + * "jmx_port":9999, + * "timestamp":"2233345666", + * "endpoints": "{PLAINTEXT://host1:9092,SSL://host1:9093"} + */ def createBroker(id: Int, brokerInfoString: String): Broker = { if(brokerInfoString == null) throw new BrokerNotAvailableException("Broker id %s does not exist".format(id)) @@ -36,9 +58,18 @@ object Broker { Json.parseFull(brokerInfoString) match { case Some(m) => val brokerInfo = m.asInstanceOf[Map[String, Any]] - val host = brokerInfo.get("host").get.asInstanceOf[String] - val port = brokerInfo.get("port").get.asInstanceOf[Int] - new Broker(id, host, port) + val version = brokerInfo.get("version").get.asInstanceOf[Int] + val endpoints = version match { + case 1 => + val host = brokerInfo.get("host").get.asInstanceOf[String] + val port = brokerInfo.get("port").get.asInstanceOf[Int] + Map(SecurityProtocol.PLAINTEXT -> new EndPoint(host, port, SecurityProtocol.PLAINTEXT)) + case 2 => + val listeners = brokerInfo.get("endpoints").get.asInstanceOf[String] + Utils.listenerListToEndPoints(listeners) + case _ => throw new KafkaException("Unknown version of broker registration. Only versions 1 and 2 are supported." + brokerInfoString) + } + new Broker(id, endpoints) case None => throw new BrokerNotAvailableException("Broker id %d does not exist".format(id)) } @@ -47,36 +78,79 @@ object Broker { } } + /** + * + * @param buffer Containing serialized broker. + * Current serialization is: + * id (int), host (size + string), number of endpoints (int), serialized endpoints + * @return broker object + */ def readFrom(buffer: ByteBuffer): Broker = { val id = buffer.getInt - val host = readShortString(buffer) - val port = buffer.getInt - new Broker(id, host, port) + val numEndpoints = buffer.getInt + + val endpoints = List.range(0, numEndpoints).map(i => EndPoint.readFrom(buffer)) + .map(ep => ep.protocolType -> ep).toMap + new Broker(id, endpoints) } } -case class Broker(id: Int, host: String, port: Int) { - - override def toString: String = "id:" + id + ",host:" + host + ",port:" + port +case class Broker(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) { + + override def toString: String = id + " : " + endPoints.values.mkString("(",",",")") + + def this(id: Int, host: String, port: Int, protocol: SecurityProtocol) = { + this(id, Map(protocol -> EndPoint(host, port, protocol))) + } + + def this(bep: BrokerEndpoint, protocol: SecurityProtocol) = { + this(bep.id, bep.host, bep.port, protocol) + } - def connectionString: String = formatAddress(host, port) def writeTo(buffer: ByteBuffer) { buffer.putInt(id) - writeShortString(buffer, host) - buffer.putInt(port) + buffer.putInt(endPoints.size) + for(endpoint <- endPoints.values) { + endpoint.writeTo(buffer) + } + } + + def sizeInBytes: Int = + 4 + /* broker id*/ + 4 + /* number of endPoints */ + endPoints.values.map(_.sizeInBytes).sum /* end points */ + + def supportsChannel(protocolType: SecurityProtocol): Unit = { + endPoints.contains(protocolType) } - def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/ + def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndpoint = { + val endpoint = endPoints.get(protocolType) + endpoint match { + case Some(endpoint) => new BrokerEndpoint(id, endpoint.host, endpoint.port) + case None => + throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id)) + } + + + } override def equals(obj: Any): Boolean = { obj match { case null => false - case n: Broker => id == n.id && host == n.host && port == n.port + // Yes, Scala compares lists element by element + case n: Broker => id == n.id && endPoints == n.endPoints case _ => false } } - - override def hashCode(): Int = hashcode(id, host, port) + + + override def hashCode(): Int = hashcode(id, endPoints) } + + + + + diff --git a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala new file mode 100644 index 0000000..22dba18 --- /dev/null +++ b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.cluster + +import java.nio.ByteBuffer + +import kafka.api.ApiUtils._ +import kafka.common.KafkaException +import org.apache.kafka.common.utils.Utils._ + +object BrokerEndpoint { + def createBrokerEndPoint(brokerId: Int, connectionString: String): BrokerEndpoint = { + + // BrokerEndPoint URI is host:port or [ipv6_host]:port + // Note that unlike EndPoint (or listener) this URI has no security information. + val uriParseExp = """\[?([0-9a-z\-.:]*)\]?:([0-9]+)""".r + + connectionString match { + case uriParseExp(host, port) => new BrokerEndpoint(brokerId, host, port.toInt) + case _ => throw new KafkaException("Unable to parse " + connectionString + " to a broker endpoint") + } + } + + def readFrom(buffer: ByteBuffer): BrokerEndpoint = { + val brokerId = buffer.getInt() + val host = readShortString(buffer) + val port = buffer.getInt() + BrokerEndpoint(brokerId, host, port) + } +} + +/** + * BrokerEndpoint is used to connect to specific host:port pair. + * It is typically used by clients (or brokers when connecting to other brokers) + * and contains no information about the security protocol used on the connection. + * Clients should know which security protocol to use from configuration. + * This allows us to keep the wire protocol with the clients unchanged where the protocol is not needed. + */ +case class BrokerEndpoint(id: Int, host: String, port: Int) { + + def connectionString(): String = formatAddress(host, port) + + def writeTo(buffer: ByteBuffer): Unit = { + buffer.putInt(id) + writeShortString(buffer, host) + buffer.putInt(port) + } + + def sizeInBytes: Int = + 4 + /* broker Id */ + 4 + /* port */ + shortStringLength(host) +} diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala new file mode 100644 index 0000000..012d1fd --- /dev/null +++ b/core/src/main/scala/kafka/cluster/EndPoint.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.cluster + +import java.nio.ByteBuffer + +import kafka.api.ApiUtils._ +import kafka.common.KafkaException +import kafka.cluster.SecurityProtocol._ + +object EndPoint { + + def readFrom(buffer: ByteBuffer): EndPoint = { + val port = buffer.getInt() + val host = readShortString(buffer) + val protocol = buffer.getShort() + EndPoint(host, port, SecurityProtocol(protocol)) + } + + def createEndPoint(connectionString: String): EndPoint = { + val uriParseExp = """^(.*)://\[?([0-9a-z\-.:]*)\]?:([0-9]+)""".r + connectionString match { + case uriParseExp(protocol, "", port) => new EndPoint(null, port.toInt, SecurityProtocol.withName(protocol)) + case uriParseExp(protocol, host, port) => new EndPoint(host, port.toInt, SecurityProtocol.withName(protocol)) + case _ => throw new KafkaException("Unable to parse " + connectionString + " to a broker endpoint") + } + } +} + +/** + * Part of the broker definition - matching host/port pair to a protocol + */ +case class EndPoint(host: String, port: Int, protocolType: SecurityProtocol) { + + override def toString: String = { + val hostStr = if (host == null || host.contains(":")) "[" + host + "]" else host + protocolType + "://" + hostStr + ":" + port + } + + def writeTo(buffer: ByteBuffer): Unit = { + buffer.putInt(port) + writeShortString(buffer, host) + buffer.putShort(protocolType.id.toShort) + } + + def sizeInBytes: Int = + 4 + /* port */ + shortStringLength(host) + + 2 /* protocol id */ +} diff --git a/core/src/main/scala/kafka/cluster/SecurityProtocol.scala b/core/src/main/scala/kafka/cluster/SecurityProtocol.scala new file mode 100644 index 0000000..ca7105c --- /dev/null +++ b/core/src/main/scala/kafka/cluster/SecurityProtocol.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.cluster + + +object SecurityProtocol extends Enumeration { + + type SecurityProtocol = Value + val TRACE = Value + val PLAINTEXT = Value +} + diff --git a/core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala b/core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala new file mode 100644 index 0000000..455d8c6 --- /dev/null +++ b/core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +class BrokerEndPointNotAvailableException(message: String) extends RuntimeException(message) { + def this() = this(null) +} diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 9ebbee6..4112343 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -19,6 +19,7 @@ package kafka.consumer import java.util.Properties import kafka.api.OffsetRequest +import kafka.cluster.SecurityProtocol import kafka.utils._ import kafka.common.{InvalidConfigException, Config} @@ -180,6 +181,12 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */ val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy) + + /** + * Only PLAINTEXT protocol is supported on the scala consumer, so no need to set this + * This parameter is used for testing. + **/ + val securityProtocol = SecurityProtocol.withName(props.getString("security.protocol", "PLAINTEXT")) validate(this) } diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index b9e2bea..59e9876 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -19,9 +19,8 @@ package kafka.consumer import org.I0Itec.zkclient.ZkClient import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager} -import kafka.cluster.{Cluster, Broker} +import kafka.cluster.{BrokerEndpoint, Cluster} import scala.collection.immutable -import scala.collection.Map import collection.mutable.HashMap import scala.collection.mutable import java.util.concurrent.locks.ReentrantLock @@ -53,7 +52,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, private class LeaderFinderThread(name: String) extends ShutdownableThread(name) { // thread responsible for adding the fetcher to the right broker when leader is available override def doWork() { - val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker] + val leaderForPartitionsMap = new HashMap[TopicAndPartition, BrokerEndpoint] lock.lock() try { while (noLeaderPartitionSet.isEmpty) { @@ -62,7 +61,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, } trace("Partitions without leader %s".format(noLeaderPartitionSet)) - val brokers = getAllBrokersInCluster(zkClient) + val brokers = getAllBrokerEndPointsForChannel(zkClient, config.securityProtocol) val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers, config.clientId, @@ -114,7 +113,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, } } - override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { + override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndpoint): AbstractFetcherThread = { new ConsumerFetcherThread( "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id), config, sourceBroker, partitionMap, this) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index ee6139c..a51a678 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -17,7 +17,7 @@ package kafka.consumer -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint import kafka.server.AbstractFetcherThread import kafka.message.ByteBufferMessageSet import kafka.api.{Request, OffsetRequest, FetchResponsePartitionData} @@ -26,7 +26,7 @@ import kafka.common.TopicAndPartition class ConsumerFetcherThread(name: String, val config: ConsumerConfig, - sourceBroker: Broker, + sourceBroker: BrokerEndpoint, partitionMap: Map[TopicAndPartition, PartitionTopicInfo], val consumerFetcherManager: ConsumerFetcherManager) extends AbstractFetcherThread(name = name, diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 5487259..531b25d 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -185,7 +185,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def ensureOffsetManagerConnected() { if (config.offsetsStorage == "kafka") { if (offsetsChannel == null || !offsetsChannel.isConnected) - offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkClient, config.offsetsChannelSocketTimeoutMs, config.offsetsChannelBackoffMs) + offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkClient, + config.offsetsChannelSocketTimeoutMs, config.offsetsChannelBackoffMs, config.securityProtocol) debug("Connected to offset manager %s:%d.".format(offsetsChannel.host, offsetsChannel.port)) } diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index c582191..7aafd0f 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -18,6 +18,7 @@ package kafka.controller import kafka.network.{Receive, BlockingChannel} import kafka.utils.{Utils, Logging, ShutdownableThread} +import org.apache.kafka.common.protocol.ApiVersion import collection.mutable.HashMap import kafka.cluster.Broker import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue} @@ -80,7 +81,8 @@ class ControllerChannelManager (private val controllerContext: ControllerContext private def addNewBroker(broker: Broker) { val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize) debug("Controller %d trying to connect to broker %d".format(config.brokerId,broker.id)) - val channel = new BlockingChannel(broker.host, broker.port, + val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol) + val channel = new BlockingChannel(brokerEndPoint.host, brokerEndPoint.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, config.controllerSocketTimeoutMs) @@ -284,7 +286,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging val broker = m._1 val partitionStateInfos = m._2.toMap val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet - val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)) + val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map(b => b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol)) val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) for (p <- partitionStateInfos) { val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" @@ -299,8 +301,10 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging updateMetadataRequestMap.foreach { m => val broker = m._1 val partitionStateInfos = m._2.toMap - val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, correlationId, clientId, - partitionStateInfos, controllerContext.liveOrShuttingDownBrokers) + + val versionId = if (controller.config.intraBrokerProtocolVersion.onOrAfter(ApiVersion.KAFKA_0830)) 1 else 0 + val updateMetadataRequest = new UpdateMetadataRequest(versionId = versionId.toShort, controllerId = controllerId, controllerEpoch = controllerEpoch, + correlationId = correlationId, clientId = clientId, partitionStateInfos = partitionStateInfos, aliveBrokers = controllerContext.liveOrShuttingDownBrokers) partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " + "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, correlationId, broker, p._1))) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index e9b4dc6..88dfa19 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -212,7 +212,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt def epoch = controllerContext.epoch - def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, config.hostName, config.port) + def clientId = { + val listeners = config.listeners + val controllerListener = listeners.get(config.interBrokerSecurityProtocol) + "id_%d-host_%s-port_%d".format(config.brokerId, controllerListener.get.host, controllerListener.get.port) + } /** * On clean shutdown, the controller first determines the partitions that the diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala index d281bb3..9c14428 100644 --- a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala +++ b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala @@ -18,14 +18,13 @@ package kafka.javaapi import java.nio.ByteBuffer - -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadataResponse) { def errorCode = underlying.errorCode - def coordinator: Broker = { + def coordinator: BrokerEndpoint = { import kafka.javaapi.Implicits._ underlying.coordinatorOpt } diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala index f384e04..ebbd589 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala @@ -16,7 +16,7 @@ */ package kafka.javaapi -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint import scala.collection.JavaConversions private[javaapi] object MetadataListImplicits { @@ -52,17 +52,17 @@ class TopicMetadata(private val underlying: kafka.api.TopicMetadata) { class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) { def partitionId: Int = underlying.partitionId - def leader: Broker = { + def leader: BrokerEndpoint = { import kafka.javaapi.Implicits._ underlying.leader } - def replicas: java.util.List[Broker] = { + def replicas: java.util.List[BrokerEndpoint] = { import JavaConversions._ underlying.replicas } - def isr: java.util.List[Broker] = { + def isr: java.util.List[BrokerEndpoint] = { import JavaConversions._ underlying.isr } diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 7b1db3d..c4bad46 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -18,6 +18,8 @@ package kafka.network import java.util.concurrent._ +import kafka.cluster.SecurityProtocol +import kafka.cluster.SecurityProtocol.SecurityProtocol import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.nio.ByteBuffer @@ -30,7 +32,7 @@ import org.apache.log4j.Logger object RequestChannel extends Logging { - val AllDone = new Request(1, 2, getShutdownReceive(), 0) + val AllDone = new Request(processor = 1, requestKey = 2, buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT) def getShutdownReceive() = { val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) @@ -41,7 +43,7 @@ object RequestChannel extends Logging { byteBuffer } - case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) { + case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0), securityProtocol: SecurityProtocol) { @volatile var requestDequeueTimeMs = -1L @volatile var apiLocalCompleteTimeMs = -1L @volatile var responseCompleteTimeMs = -1L diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 76ce41a..080a22b 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -24,7 +24,11 @@ import java.net._ import java.io._ import java.nio.channels._ +import kafka.cluster.{SecurityProtocol, EndPoint} +import kafka.cluster.SecurityProtocol.SecurityProtocol + import scala.collection._ +import scala.collection.JavaConversions._ import kafka.common.KafkaException import kafka.metrics.KafkaMetricsGroup @@ -38,8 +42,7 @@ import com.yammer.metrics.core.{Gauge, Meter} * M Handler threads that handle requests and produce responses back to the processor threads for writing. */ class SocketServer(val brokerId: Int, - val host: String, - val port: Int, + val endpoints: Map[SecurityProtocol, EndPoint], val numProcessorThreads: Int, val maxQueuedRequests: Int, val sendBufferSize: Int, @@ -51,28 +54,39 @@ class SocketServer(val brokerId: Int, this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) - @volatile private var acceptor: Acceptor = null + @volatile private[network] var acceptors: ConcurrentHashMap[EndPoint,Acceptor] = new ConcurrentHashMap[EndPoint,Acceptor]() val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests) /* a meter to track the average free capacity of the network processors */ private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", "percent", TimeUnit.NANOSECONDS) + + /* I'm pushing the mapping of port-to-protocol to the processor level, + so the processor can put the correct protocol in the request channel. + we'll probably have a more elegant way of doing this once we patch the request channel + to include more information about security and authentication. + TODO: re-consider this code when working on KAFKA-1683 + */ + private val portToProtocol: Map[Int, SecurityProtocol] = + endpoints.map{ case (protocol: SecurityProtocol, endpoint: EndPoint) => (endpoint.port -> protocol )} + /** * Start the socket server */ def startup() { val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) for(i <- 0 until numProcessorThreads) { - processors(i) = new Processor(i, - time, - maxRequestSize, + processors(i) = new Processor(i, + time, + maxRequestSize, aggregateIdleMeter, newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)), - numProcessorThreads, + numProcessorThreads, requestChannel, quotas, - connectionsMaxIdleMs) - Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() + connectionsMaxIdleMs, + portToProtocol) + Utils.newThread("kafka-network-thread-%d".format(i), processors(i), false).start() } newGauge("ResponsesBeingSent", new Gauge[Int] { @@ -83,10 +97,17 @@ class SocketServer(val brokerId: Int, requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) // start accepting connections - this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas) - Utils.newThread("kafka-socket-acceptor", acceptor, false).start() - acceptor.awaitStartup - info("Started") + // right now we will use the same processors for all ports, since we didn't implement different protocols + // in the future, we may implement different processors for SSL and Kerberos + + endpoints.values.foreach(endpoint => { + val acceptor = new Acceptor(endpoint.host, endpoint.port, processors, sendBufferSize, recvBufferSize, quotas) + acceptors.put(endpoint,acceptor) + Utils.newThread("kafka-socket-acceptor-%s-%d".format(endpoint.protocolType.toString, endpoint.port), acceptor, false).start() + acceptor.awaitStartup + }) + + info("Started " + acceptors.size() + " acceptor threads") } /** @@ -94,8 +115,8 @@ class SocketServer(val brokerId: Int, */ def shutdown() = { info("Shutting down") - if(acceptor != null) - acceptor.shutdown() + if(acceptors != null) + acceptors.values().foreach(_.shutdown()) for(processor <- processors) processor.shutdown() info("Shutdown completed") @@ -301,7 +322,8 @@ private[kafka] class Processor(val id: Int, val totalProcessorThreads: Int, val requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, - val connectionsMaxIdleMs: Long) extends AbstractServerThread(connectionQuotas) { + val connectionsMaxIdleMs: Long, + val portToProtocol: Map[Int,SecurityProtocol]) extends AbstractServerThread(connectionQuotas) { private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() private val connectionsMaxIdleNanos = connectionsMaxIdleMs * 1000 * 1000 @@ -447,7 +469,9 @@ private[kafka] class Processor(val id: Int, if(read < 0) { close(key) } else if(receive.complete) { - val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address) + val port = socketChannel.socket().getLocalPort + val protocol = portToProtocol(port) + val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address, securityProtocol = protocol) requestChannel.sendRequest(req) key.attach(null) // explicitly reset interest ops to not READ, no need to wake up the selector just yet diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala index 43df70b..362623f 100644 --- a/core/src/main/scala/kafka/producer/ProducerPool.scala +++ b/core/src/main/scala/kafka/producer/ProducerPool.scala @@ -17,7 +17,7 @@ package kafka.producer -import kafka.cluster.Broker +import kafka.cluster.{BrokerEndpoint, Broker} import java.util.Properties import collection.mutable.HashMap import java.lang.Object @@ -30,7 +30,7 @@ object ProducerPool { /** * Used in ProducerPool to initiate a SyncProducer connection with a broker. */ - def createSyncProducer(config: ProducerConfig, broker: Broker): SyncProducer = { + def createSyncProducer(config: ProducerConfig, broker: BrokerEndpoint): SyncProducer = { val props = new Properties() props.put("host", broker.host) props.put("port", broker.port.toString) @@ -44,11 +44,12 @@ class ProducerPool(val config: ProducerConfig) extends Logging { private val lock = new Object() def updateProducer(topicMetadata: Seq[TopicMetadata]) { - val newBrokers = new collection.mutable.HashSet[Broker] + val newBrokers = new collection.mutable.HashSet[BrokerEndpoint] topicMetadata.foreach(tmd => { tmd.partitionsMetadata.foreach(pmd => { - if(pmd.leader.isDefined) - newBrokers+=(pmd.leader.get) + if(pmd.leader.isDefined) { + newBrokers += pmd.leader.get + } }) }) lock synchronized { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 20c00cb..94aa952 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import scala.collection.Set import scala.collection.Map import kafka.utils.{Utils, Logging} -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition import com.yammer.metrics.core.Gauge @@ -68,7 +68,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri } // to be defined in subclass to create a specific fetcher - def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread + def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndpoint): AbstractFetcherThread def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) { mapLock synchronized { @@ -126,6 +126,6 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri } } -case class BrokerAndFetcherId(broker: Broker, fetcherId: Int) +case class BrokerAndFetcherId(broker: BrokerEndpoint, fetcherId: Int) -case class BrokerAndInitialOffset(broker: Broker, initOffset: Long) \ No newline at end of file +case class BrokerAndInitialOffset(broker: BrokerEndpoint, initOffset: Long) \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 8c281d4..210369a 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint import kafka.utils.{Pool, ShutdownableThread} import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} @@ -36,7 +36,7 @@ import com.yammer.metrics.core.Gauge /** * Abstract class for fetching data from multiple partitions from the same broker. */ -abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, +abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: BrokerEndpoint, socketTimeout: Int, socketBufferSize: Int, fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 703886a..5e449fa 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -19,8 +19,8 @@ package kafka.server import org.apache.kafka.common.requests.JoinGroupResponse import org.apache.kafka.common.requests.HeartbeatResponse +import kafka.cluster.SecurityProtocol.SecurityProtocol import org.apache.kafka.common.TopicPartition - import kafka.api._ import kafka.admin.AdminUtils import kafka.common._ @@ -351,8 +351,8 @@ class KafkaApis(val requestChannel: RequestChannel, ret.toSeq.sortBy(- _) } - private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = { - val topicResponses = metadataCache.getTopicMetadata(topics) + private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol): Seq[TopicMetadata] = { + val topicResponses = metadataCache.getTopicMetadata(topics, securityProtocol) if (topics.size > 0 && topicResponses.size != topics.size) { val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet val responsesForNonExistentTopics = nonExistentTopics.map { topic => @@ -394,10 +394,10 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] - val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet) + val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet, request.securityProtocol) val brokers = metadataCache.getAliveBrokers trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) - val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId) + val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata, metadataRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } @@ -434,7 +434,7 @@ class KafkaApis(val requestChannel: RequestChannel, val partition = offsetManager.partitionFor(consumerMetadataRequest.group) // get metadata (and create the topic if necessary) - val offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName)).head + val offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName), request.securityProtocol).head val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 14bf321..7de983f 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -18,22 +18,63 @@ package kafka.server import java.util.Properties +import kafka.cluster.SecurityProtocol._ +import kafka.cluster.{EndPoint, SecurityProtocol} +import kafka.common.InvalidConfigException import kafka.message.{MessageSet, Message} import kafka.consumer.ConsumerConfig import kafka.utils.{VerifiableProperties, ZKConfig, Utils} -import kafka.message.NoCompressionCodec import kafka.message.BrokerCompressionCodec +import org.apache.kafka.common.protocol.ApiVersion + +import scala.collection.immutable /** * Configuration settings for the kafka server */ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(props) { + def validateUniquePortAndProtocol(listeners: String) { + val listenerList = Utils.parseCsvList(listeners) + val endpoints = listenerList.map(listener => EndPoint.createEndPoint(listener)) + val distinctPorts = endpoints.map(ep => ep.port).distinct + val distinctProtocols = endpoints.map(ep => ep.protocolType).distinct + + if (distinctPorts.size < endpoints.size) throw new InvalidConfigException("Only one listener is allowed per port. Please check listeners for duplicates") + if (distinctProtocols.size < endpoints.size) throw new InvalidConfigException("Only one listener is allowed per protocol. Please check listeners for duplicates") + } + def this(originalProps: Properties) { this(new VerifiableProperties(originalProps)) props.verify() } + // If the user did not define listeners but did define host or port, lets use them in backward compatible way + // If none of those are defined, we default to PLAINTEXT://null:6667 + private def getListeners(): immutable.Map[SecurityProtocol, EndPoint] = { + if (props.containsKey("listeners")) { + validateUniquePortAndProtocol(props.getString("listeners")) + Utils.listenerListToEndPoints(props.getString("listeners")) + } else { + Utils.listenerListToEndPoints("PLAINTEXT://" + props.getString("host.name", "") + ":" + props.getInt("port", 6667).toString) + } + } + + // If the user defined advertised listeners, we use those + // If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults + // If none of these are defined, we'll use the listeners + private def getAdvertisedListeners(): immutable.Map[SecurityProtocol, EndPoint] = { + if (props.containsKey("advertised.listeners")) { + validateUniquePortAndProtocol(props.getString("advertised.listeners")) + Utils.listenerListToEndPoints(props.getString("advertised.listeners")) + } else if (props.containsKey("advertised.host.name") || props.containsKey("advertised.port") ) { + Utils.listenerListToEndPoints("PLAINTEXT://" + props.getString("advertised.host.name", props.getString("host.name", "")) + + ":" + props.getInt("advertised.port", props.getInt("port", 6667)).toString) + } else { + getListeners() + } + } + private def getLogRetentionTimeMillis(): Long = { val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute @@ -99,23 +140,20 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /*********** Socket Server Configuration ***********/ - /* the port to listen and accept connections on */ - val port: Int = props.getInt("port", 9092) - - /* hostname of broker. If this is set, it will only bind to this address. If this is not set, - * it will bind to all interfaces */ - val hostName: String = props.getString("host.name", null) - - /* hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may - * need to be different from the interface to which the broker binds. If this is not set, - * it will use the value for "host.name" if configured. Otherwise - * it will use the value returned from java.net.InetAddress.getCanonicalHostName(). */ - val advertisedHostName: String = props.getString("advertised.host.name", hostName) - - /* the port to publish to ZooKeeper for clients to use. In IaaS environments, this may - * need to be different from the port to which the broker binds. If this is not set, - * it will publish the same port that the broker binds to. */ - val advertisedPort: Int = props.getInt("advertised.port", port) + /* Listener List - Comma-separated list of URIs we will listen on and their protocols. + * Specify hostname as 0.0.0.0 to bind to all interfaces. + * Leave hostname empty to bind to default interface. + * + * Examples of legal listener lists: + * PLAINTEXT://myhost:9092,TRACE://:9091 + * PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093 + * */ + val listeners = getListeners() + + /* Listeners to publish to ZooKeeper for clients to use, if different than the listeners above. + * In IaaS environments, this may need to be different from the interface to which the broker binds. + * If this is not set, it will use the value for "listeners". */ + val advertisedListeners = getAdvertisedListeners() /* the SO_SNDBUF buffer of the socket sever sockets */ val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024) @@ -135,6 +173,16 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* idle connections timeout: the server socket processor threads close the connections that idle more than this */ val connectionsMaxIdleMs = props.getLong("connections.max.idle.ms", 10*60*1000L) + /* Security protocol used to communicate between brokers. Defaults to plain text. */ + val interBrokerSecurityProtocol = SecurityProtocol.withName(props.getString("security.intra.broker.protocol", "PLAINTEXT")) + + /** + * Specify which version of the inter-broker protocol will be used. + * This is typically bumped after all brokers were upgraded to a new version. + * Valid values are: 0.8.2.0, 0.8.3.0 + */ + val intraBrokerProtocolVersion = ApiVersion.parseConfig(props.getString("use.intra.broker.protocol.version", ApiVersion.getLatestVersion.toString)) + /*********** Log Configuration ***********/ /* the default number of log partitions per topic */ @@ -359,4 +407,5 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val compressionType = props.getString("compression.type", "producer").toLowerCase() require(BrokerCompressionCodec.isValid(compressionType), "compression.type : "+compressionType + " is not valid." + " Valid options are "+BrokerCompressionCodec.brokerCompressionOptions.mkString(",")) + } diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 7907987..7a1378e 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -17,6 +17,8 @@ package kafka.server +import kafka.cluster.{SecurityProtocol, EndPoint} +import kafka.cluster.SecurityProtocol.SecurityProtocol import kafka.utils._ import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkStateListener, ZkClient} @@ -31,9 +33,8 @@ import java.net.InetAddress * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise * we are dead. */ -class KafkaHealthcheck(private val brokerId: Int, - private val advertisedHost: String, - private val advertisedPort: Int, +class KafkaHealthcheck(private val brokerId: Int, + private val advertisedEndpoints: Map[SecurityProtocol, EndPoint], private val zkSessionTimeoutMs: Int, private val zkClient: ZkClient) extends Logging { @@ -49,13 +50,18 @@ class KafkaHealthcheck(private val brokerId: Int, * Register this broker as "alive" in zookeeper */ def register() { - val advertisedHostName = - if(advertisedHost == null || advertisedHost.trim.isEmpty) - InetAddress.getLocalHost.getCanonicalHostName - else - advertisedHost val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt - ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, zkSessionTimeoutMs, jmxPort) + val updatedEndpoints = advertisedEndpoints.mapValues(endpoint => + if (endpoint.host == null || endpoint.host.trim.isEmpty) + EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port, endpoint.protocolType) + else + endpoint + ) + + // the default host and port are here for compatibility with older client + // only PLAINTEXT is supported as default + val defaultEndpoint = updatedEndpoints(SecurityProtocol.PLAINTEXT) + ZkUtils.registerBrokerInZk(zkClient, brokerId, defaultEndpoint.host, defaultEndpoint.port, updatedEndpoints, zkSessionTimeoutMs, jmxPort) } /** diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 426e522..048dfe9 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -25,6 +25,7 @@ import kafka.utils._ import java.util.concurrent._ import atomic.{AtomicInteger, AtomicBoolean} import java.io.File + import collection.mutable import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} @@ -109,23 +110,22 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg logManager = createLogManager(zkClient, brokerState) logManager.startup() + socketServer = new SocketServer(config.brokerId, + config.listeners, + config.numNetworkThreads, + config.queuedMaxRequests, + config.socketSendBufferBytes, + config.socketReceiveBufferBytes, + config.socketRequestMaxBytes, + config.maxConnectionsPerIp, + config.connectionsMaxIdleMs, + config.maxConnectionsPerIpOverrides) + socketServer.startup() + /* generate brokerId */ config.brokerId = getBrokerId this.logIdent = "[Kafka Server " + config.brokerId + "], " - socketServer = new SocketServer(config.brokerId, - config.hostName, - config.port, - config.numNetworkThreads, - config.queuedMaxRequests, - config.socketSendBufferBytes, - config.socketReceiveBufferBytes, - config.socketRequestMaxBytes, - config.maxConnectionsPerIp, - config.connectionsMaxIdleMs, - config.maxConnectionsPerIpOverrides) - socketServer.startup() - /* start replica manager */ replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) replicaManager.startup() @@ -153,7 +153,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg topicConfigManager.startup() /* tell everyone we are alive */ - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedListeners, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() /* register broker metrics */ @@ -238,7 +238,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if (channel != null) { channel.disconnect() } - channel = new BlockingChannel(broker.host, broker.port, + channel = new BlockingChannel(broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).host, + broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, config.controllerSocketTimeoutMs) @@ -413,7 +414,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg *
  • config has broker.id and meta.properties contains broker.id if they don't match throws InconsistentBrokerIdException *
  • config has broker.id and there is no meta.properties file, creates new meta.properties and stores broker.id *
      - * @returns A brokerId. + * @return A brokerId. */ private def getBrokerId: Int = { var brokerId = config.brokerId diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 4c70aa7..0bee903 100644 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -17,9 +17,12 @@ package kafka.server +import kafka.cluster.SecurityProtocol.SecurityProtocol +import kafka.cluster.{BrokerEndpoint,Broker} +import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException} +import kafka.common.TopicAndPartition + import kafka.api._ -import kafka.common._ -import kafka.cluster.Broker import kafka.controller.KafkaController.StateChangeLogger import scala.collection.{Seq, Set, mutable} import kafka.utils.Logging @@ -39,7 +42,8 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { this.logIdent = "[Kafka Metadata Cache on broker %d] ".format(brokerId) - def getTopicMetadata(topics: Set[String]) = { + def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol) = { + val isAllTopics = topics.isEmpty val topicsRequested = if(isAllTopics) cache.keySet else topics val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] @@ -50,18 +54,21 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { val partitionMetadata = partitionStateInfos.map { case (partitionId, partitionState) => val replicas = partitionState.allReplicas - val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq - var leaderInfo: Option[Broker] = None - var isrInfo: Seq[Broker] = Nil + val replicaInfo: Seq[BrokerEndpoint] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq.map(_.getBrokerEndPoint(protocol)) + var leaderInfo: Option[BrokerEndpoint] = None + var leaderBrokerInfo: Option[Broker] = None + var isrInfo: Seq[BrokerEndpoint] = Nil val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch val leader = leaderIsrAndEpoch.leaderAndIsr.leader val isr = leaderIsrAndEpoch.leaderAndIsr.isr val topicPartition = TopicAndPartition(topic, partitionId) try { - leaderInfo = aliveBrokers.get(leader) - if (!leaderInfo.isDefined) + leaderBrokerInfo = aliveBrokers.get(leader) + if (!leaderBrokerInfo.isDefined) throw new LeaderNotAvailableException("Leader not available for %s".format(topicPartition)) - isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) + else + leaderInfo = Some(leaderBrokerInfo.get.getBrokerEndPoint(protocol)) + isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).map(_.getBrokerEndPoint(protocol)) if (replicaInfo.size < replicas.size) throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala index 351dbba..f0a2a5b 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala @@ -17,13 +17,13 @@ package kafka.server -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager) extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, "Replica", brokerConfig.numReplicaFetchers) { - override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { + override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndpoint): AbstractFetcherThread = { new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr) } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 6879e73..b155cd1 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -18,14 +18,14 @@ package kafka.server import kafka.admin.AdminUtils -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet import kafka.api.{OffsetRequest, FetchResponsePartitionData} import kafka.common.{KafkaStorageException, TopicAndPartition} class ReplicaFetcherThread(name:String, - sourceBroker: Broker, + sourceBroker: BrokerEndpoint, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager) extends AbstractFetcherThread(name = name, diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 586cf4c..7818ed8 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -19,7 +19,7 @@ package kafka.server import kafka.api._ import kafka.common._ import kafka.utils._ -import kafka.cluster.{Broker, Partition, Replica} +import kafka.cluster.{BrokerEndpoint, Partition, Replica} import kafka.log.{LogAppendInfo, LogManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController @@ -648,7 +648,7 @@ class ReplicaManager(val config: KafkaConfig, * the error message will be set on each partition since we do not know which partition caused it */ private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], - leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short], + leaders: Set[BrokerEndpoint], correlationId: Int, responseMap: mutable.Map[(String, Int), Short], offsetManager: OffsetManager) { partitionState.foreach { state => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index d1e7c43..03b121d 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -19,6 +19,7 @@ package kafka.tools import joptsimple._ +import kafka.cluster.SecurityProtocol import org.I0Itec.zkclient.ZkClient import kafka.utils._ import kafka.consumer.SimpleConsumer @@ -158,7 +159,7 @@ object ConsumerOffsetChecker extends Logging { topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*) val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq - val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) + val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs, SecurityProtocol.PLAINTEXT) debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) channel.send(OffsetFetchRequest(group, topicPartitions)) diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index ba6ddd7..d1050b4 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -18,7 +18,7 @@ package kafka.tools import joptsimple.OptionParser -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet} import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference @@ -197,7 +197,7 @@ private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, c private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int], leadersPerBroker: Map[Int, Seq[TopicAndPartition]], expectedNumFetchers: Int, - brokerMap: Map[Int, Broker], + brokerMap: Map[Int, BrokerEndpoint], initialOffsetTime: Long, reportInterval: Long) extends Logging { private val fetchOffsetMap = new Pool[TopicAndPartition, Long] @@ -335,7 +335,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } } -private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartitions: Iterable[TopicAndPartition], +private class ReplicaFetcher(name: String, sourceBroker: BrokerEndpoint, topicAndPartitions: Iterable[TopicAndPartition], replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int, fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean) extends ShutdownableThread(name) { diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index b4f903b..7379fe3 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -22,7 +22,7 @@ import kafka.utils._ import kafka.consumer._ import kafka.client.ClientUtils import kafka.api.{OffsetRequest, FetchRequestBuilder, Request} -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint import scala.collection.JavaConversions._ import kafka.common.TopicAndPartition @@ -142,8 +142,8 @@ object SimpleConsumerShell extends Logging { } // validating replica id and initializing target broker - var fetchTargetBroker: Broker = null - var replicaOpt: Option[Broker] = null + var fetchTargetBroker: BrokerEndpoint = null + var replicaOpt: Option[BrokerEndpoint] = null if(replicaId == UseLeaderReplica) { replicaOpt = partitionMetadataOpt.get.leader if(!replicaOpt.isDefined) { @@ -167,7 +167,9 @@ object SimpleConsumerShell extends Logging { System.exit(1) } if (startingOffset < 0) { - val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout, + val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, + fetchTargetBroker.port, + ConsumerConfig.SocketTimeout, ConsumerConfig.SocketBufferSize, clientId) try { startingOffset = simpleConsumer.earliestOrLatestOffset(TopicAndPartition(topic, partitionId), startingOffset, @@ -188,8 +190,12 @@ object SimpleConsumerShell extends Logging { val replicaString = if(replicaId > 0) "leader" else "replica" info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]" - .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset)) - val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId) + .format(topic, partitionId, replicaString, replicaId, + fetchTargetBroker.host, + fetchTargetBroker.port, startingOffset)) + val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, + fetchTargetBroker.port, + 10000, 64*1024, clientId) val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() { def run() { var offset = startingOffset diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala index 111c9a8..c6c5a88 100644 --- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala +++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala @@ -17,6 +17,7 @@ package kafka.tools +import kafka.cluster.SecurityProtocol import org.I0Itec.zkclient.ZkClient import kafka.consumer.{SimpleConsumer, ConsumerConfig} import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} @@ -65,7 +66,9 @@ object UpdateOffsetsInZK { ZkUtils.getBrokerInfo(zkClient, broker) match { case Some(brokerInfo) => - val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024, "UpdateOffsetsInZk") + val consumer = new SimpleConsumer(brokerInfo.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host, + brokerInfo.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port, + 10000, 100 * 1024, "UpdateOffsetsInZk") val topicAndPartition = TopicAndPartition(topic, partition) val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1))) val offset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 738c1af..b2dec5c 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -24,11 +24,15 @@ import java.nio.channels._ import java.util.concurrent.locks.{ReadWriteLock, Lock} import java.lang.management._ import javax.management._ + +import kafka.cluster.SecurityProtocol.SecurityProtocol + import scala.collection._ import scala.collection.mutable import java.util.Properties import kafka.common.KafkaException import kafka.common.KafkaStorageException +import kafka.cluster.EndPoint /** @@ -607,4 +611,9 @@ object Utils extends Logging { .filter{ case (k,l) => (l > 1) } .keys } + + def listenerListToEndPoints(listeners: String): immutable.Map[SecurityProtocol, EndPoint] = { + val listenerList = parseCsvList(listeners) + listenerList.map(listener => EndPoint.createEndPoint(listener)).map(ep => ep.protocolType -> ep).toMap + } } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 7ae999e..2517fda 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -17,7 +17,8 @@ package kafka.utils -import kafka.cluster.{Broker, Cluster} +import kafka.cluster.SecurityProtocol.SecurityProtocol +import kafka.cluster._ import kafka.consumer.{ConsumerThreadId, TopicCount} import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, @@ -84,6 +85,10 @@ object ZkUtils extends Logging { brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) } + def getAllBrokerEndPointsForChannel(zkClient: ZkClient, protocolType: SecurityProtocol): Seq[BrokerEndpoint] = { + getAllBrokersInCluster(zkClient).map(_.getBrokerEndPoint(protocolType)) + } + def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr) } @@ -169,12 +174,28 @@ object ZkUtils extends Logging { } } - def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) { + /** + * Register brokers with v2 json format (which includes multiple endpoints). + * This format also includes default endpoints for compatibility with older clients. + * @param zkClient + * @param id + * @param advertisedEndpoints + * @param timeout + * @param jmxPort + */ + def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, advertisedEndpoints: immutable.Map[SecurityProtocol, EndPoint], timeout: Int, jmxPort: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val timestamp = SystemTime.milliseconds.toString - val brokerInfo = Json.encode(Map("version" -> 1, "host" -> host, "port" -> port, "jmx_port" -> jmxPort, "timestamp" -> timestamp)) - val expectedBroker = new Broker(id, host, port) + val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port, "endpoints"->advertisedEndpoints.values.mkString(","), "jmx_port" -> jmxPort, "timestamp" -> timestamp)) + val expectedBroker = new Broker(id, advertisedEndpoints) + + registerBrokerInZk(zkClient, brokerIdPath, brokerInfo, expectedBroker, timeout) + + info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(","))) + } + + def registerBrokerInZk(zkClient: ZkClient, brokerIdPath: String, brokerInfo: String, expectedBroker: Broker, timeout: Int) { try { createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, expectedBroker, (brokerString: String, broker: Any) => Broker.createBroker(broker.asInstanceOf[Broker].id, brokerString).equals(broker.asInstanceOf[Broker]), @@ -183,11 +204,10 @@ object ZkUtils extends Logging { } catch { case e: ZkNodeExistsException => throw new RuntimeException("A broker is already registered on the path " + brokerIdPath - + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " - + "else you have shutdown this broker and restarted it faster than the zookeeper " - + "timeout so it appears to be re-registering.") + + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + + "else you have shutdown this broker and restarted it faster than the zookeeper " + + "timeout so it appears to be re-registering.") } - info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port)) } def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = { diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index ba48a63..9167bbb 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -19,17 +19,16 @@ package kafka.api.test import org.junit.Test import org.junit.Assert._ - import java.lang.Integer import java.util.{Properties, Random} import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException} import kafka.api.FetchRequestBuilder import kafka.common.Topic -import kafka.consumer.SimpleConsumer import kafka.server.KafkaConfig import kafka.integration.KafkaServerTestHarness -import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils} +import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils, Utils} +import kafka.consumer.SimpleConsumer import org.apache.kafka.common.KafkaException import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException} @@ -67,8 +66,11 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { super.setUp() // TODO: we need to migrate to new consumers when 0.9 is final - consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "") - consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "") + + val endpoint1 = configs(0).listeners.values.head + val endpoint2 = configs(1).listeners.values.head + consumer1 = new SimpleConsumer("localhost", endpoint1.port, 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", endpoint2.port, 100, 1024*1024, "") producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize) producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize) @@ -266,7 +268,6 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { server.shutdown() server.awaitShutdown() server.startup() - Thread.sleep(2000) } diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 8154a42..14a4ea9 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -25,7 +25,7 @@ import org.junit.Test import org.junit.Assert._ import kafka.server.KafkaConfig -import kafka.utils.{TestZKUtils, TestUtils} +import kafka.utils.{Utils, TestZKUtils, TestUtils} import kafka.consumer.SimpleConsumer import kafka.message.Message import kafka.integration.KafkaServerTestHarness @@ -54,8 +54,10 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { super.setUp() // TODO: we need to migrate to new consumers when 0.9 is final - consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "") - consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "") + val endpoint1 = configs(0).listeners.values.head + val endpoint2 = configs(1).listeners.values.head + consumer1 = new SimpleConsumer("localhost", endpoint1.port, 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", endpoint2.port, 100, 1024*1024, "") } override def tearDown() { diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index a106379..13a865c 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -1,5 +1,6 @@ package other.kafka +import kafka.cluster.SecurityProtocol import org.I0Itec.zkclient.ZkClient import kafka.api._ import kafka.utils.{ShutdownableThread, ZKStringSerializer} @@ -110,7 +111,7 @@ object TestOffsetManager { private val fetchTimer = new KafkaTimer(timer) private val channels = mutable.Map[Int, BlockingChannel]() - private var metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SocketTimeoutMs) + private var metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SecurityProtocol.PLAINTEXT, SocketTimeoutMs) private val numErrors = new AtomicInteger(0) @@ -156,7 +157,7 @@ object TestOffsetManager { println("Error while querying %s:%d - shutting down query channel.".format(metadataChannel.host, metadataChannel.port)) metadataChannel.disconnect() println("Creating new query channel.") - metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SocketTimeoutMs) + metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SecurityProtocol.PLAINTEXT, SocketTimeoutMs) } finally { Thread.sleep(fetchIntervalMs) diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala index 4d36b8b..af69c05 100644 --- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala @@ -18,8 +18,14 @@ package unit.kafka import java.io.{FileOutputStream, File} import java.security.Permission +import java.util.Properties import kafka.Kafka +import kafka.cluster.SecurityProtocol +import kafka.common.InvalidConfigException +import kafka.server.KafkaConfig +import kafka.utils.Utils +import org.apache.kafka.common.protocol.ApiVersion import org.junit.{After, Before, Test} import junit.framework.Assert._ @@ -65,14 +71,14 @@ class KafkaTest { assertEquals(2, config2.brokerId) // We should be also able to set completely new property - val config3 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "port=1987")) + val config3 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact")) assertEquals(1, config3.brokerId) - assertEquals(1987, config3.port) + assertEquals("compact", config3.logCleanupPolicy) // We should be also able to set several properties - val config4 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "port=1987", "--override", "broker.id=2")) + val config4 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact", "--override", "broker.id=2")) assertEquals(2, config4.brokerId) - assertEquals(1987, config4.port) + assertEquals("compact", config4.logCleanupPolicy) } @Test(expected = classOf[ExitCalled]) @@ -99,6 +105,81 @@ class KafkaTest { Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2")) } + @Test + def testDuplicateListeners() { + val props = new Properties() + props.put("broker.id", "1") + props.put("zookeeper.connect", "localhost:2181") + + // listeners with duplicate port + props.put("listeners", "PLAINTEXT://localhost:9091,TRACE://localhost:9091") + + assert(!isValidKafkaConfig(props)) + + // listeners with duplicate protocol + props.put("listeners", "PLAINTEXT://localhost:9091,PLAINTEXT://localhost:9092") + + assert(!isValidKafkaConfig(props)) + + // advertised listeners with duplicate port + props.put("advertised,listeners", "PLAINTEXT://localhost:9091,TRACE://localhost:9091") + + assert(!isValidKafkaConfig(props)) + } + + @Test + def testListenerDefaults() { + val props = new Properties() + props.put("broker.id", "1") + props.put("zookeeper.connect", "localhost:2181") + + // configuration with host and port, but no listeners + props.put("host.name", "myhost") + props.put("port", "1111") + + val conf = new KafkaConfig(props) + assertEquals(Utils.listenerListToEndPoints("PLAINTEXT://myhost:1111"), conf.listeners) + + // configuration with null host + props.remove("host.name") + + val conf2 = new KafkaConfig(props) + assertEquals(Utils.listenerListToEndPoints("PLAINTEXT://:1111"), conf2.listeners) + assertEquals(null, conf2.listeners(SecurityProtocol.PLAINTEXT).host) + + // configuration with advertised host and port, and no advertised listeners + props.put("advertised.host.name", "otherhost") + props.put("advertised.port", "2222") + + val conf3 = new KafkaConfig(props) + assertEquals(conf3.advertisedListeners, Utils.listenerListToEndPoints("PLAINTEXT://otherhost:2222")) + + + } + + @Test + def testVersionConfiguration() { + val props = new Properties() + props.put("broker.id", "1") + props.put("zookeeper.connect", "localhost:2181") + val conf = new KafkaConfig(props) + assertEquals(conf.intraBrokerProtocolVersion, ApiVersion.getLatestVersion) + + props.put("use.intra.broker.protocol.version","0.8.2.0") + val conf2 = new KafkaConfig(props) + assertEquals(conf2.intraBrokerProtocolVersion, ApiVersion.KAFKA_0820) + } + + + def isValidKafkaConfig(props: Properties): Boolean = { + try { + new KafkaConfig(props) + true + } catch { + case e: InvalidConfigException => false + } + } + def prepareDefaultConfig(): String = { prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere")) } diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 1bf2667..73be3f2 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -22,7 +22,7 @@ import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import junit.framework.Assert._ import kafka.utils.{ZkUtils, Utils, TestUtils} -import kafka.cluster.Broker +import kafka.cluster.{SecurityProtocol, Broker} import kafka.client.ClientUtils import kafka.server.{KafkaConfig, KafkaServer} @@ -61,7 +61,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { val server4 = TestUtils.createServer(new KafkaConfig(configProps4)) servers ++= List(server1, server2, server3, server4) - brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port)) + brokers = servers.map(s => new Broker(s.config.brokerId, s.config.listeners)) // create topics first createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) @@ -109,7 +109,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { // read metadata from a broker and verify the new topic partitions exist TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1) TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions", + val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testIncrementPartitions", 2000,0).topicsMetadata val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1)) val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata @@ -134,7 +134,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { // read metadata from a broker and verify the new topic partitions exist TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1) TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas", + val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testManualAssignmentOfReplicas", 2000,0).topicsMetadata val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2)) val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata @@ -158,7 +158,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5) TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement", + val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testReplicaPlacement", 2000,0).topicsMetadata val metaDataForTopic3 = metadata.filter(p => p.topic.equals(topic3)).head diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index fba852a..0b51b82 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -22,7 +22,7 @@ import org.scalatest.junit.JUnitSuite import junit.framework.Assert._ import java.nio.ByteBuffer import kafka.message.{Message, ByteBufferMessageSet} -import kafka.cluster.Broker +import kafka.cluster.{BrokerEndpoint, SecurityProtocol, EndPoint, Broker} import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError} import kafka.utils.SystemTime import org.apache.kafka.common.requests._ @@ -80,11 +80,15 @@ object SerializationTestUtils { TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100) ) - private val brokers = List(new Broker(0, "localhost", 1011), new Broker(1, "localhost", 1012), new Broker(2, "localhost", 1013)) - private val partitionMetaData0 = new PartitionMetadata(0, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 0) - private val partitionMetaData1 = new PartitionMetadata(1, Some(brokers.head), replicas = brokers, isr = brokers.tail, errorCode = 1) - private val partitionMetaData2 = new PartitionMetadata(2, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 2) - private val partitionMetaData3 = new PartitionMetadata(3, Some(brokers.head), replicas = brokers, isr = brokers.tail.tail, errorCode = 3) + private val brokers = List(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1011, SecurityProtocol.PLAINTEXT))), + new Broker(1, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1012, SecurityProtocol.PLAINTEXT))), + new Broker(2, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1013, SecurityProtocol.PLAINTEXT)))) + private val brokerEndpoints = brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)) + + private val partitionMetaData0 = new PartitionMetadata(0, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, errorCode = 0) + private val partitionMetaData1 = new PartitionMetadata(1, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints.tail, errorCode = 1) + private val partitionMetaData2 = new PartitionMetadata(2, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, errorCode = 2) + private val partitionMetaData3 = new PartitionMetadata(3, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints.tail.tail, errorCode = 3) private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3) private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq) private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq) @@ -94,7 +98,7 @@ object SerializationTestUtils { val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1) val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, isr1.toSet)), ((topic2, 0), PartitionStateInfo(leaderAndIsr2, isr2.toSet))) - new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1, 0, "") + new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[BrokerEndpoint](), 0, 1, 0, "") } def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = { @@ -148,7 +152,7 @@ object SerializationTestUtils { } def createTestTopicMetadataResponse: TopicMetadataResponse = { - new TopicMetadataResponse(brokers, Seq(topicmetaData1, topicmetaData2), 1) + new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)).toVector, Seq(topicmetaData1, topicmetaData2), 1) } def createTestOffsetCommitRequestV1: OffsetCommitRequest = { @@ -192,7 +196,7 @@ object SerializationTestUtils { } def createConsumerMetadataResponse: ConsumerMetadataResponse = { - ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError, 0) + ConsumerMetadataResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0) } def createHeartbeatRequestAndHeader: HeartbeatRequestAndHeader = { diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerTest.scala new file mode 100644 index 0000000..b9c7fd1 --- /dev/null +++ b/core/src/test/scala/unit/kafka/cluster/BrokerTest.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.cluster + +import java.nio.ByteBuffer + +import kafka.utils.Logging +import org.junit.Test +import org.scalatest.junit.JUnit3Suite + +import scala.collection.mutable + +class BrokerTest extends JUnit3Suite with Logging { + + @Test + def testSerDe() = { + + val endpoint = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) + val listEndPoints = Map(SecurityProtocol.PLAINTEXT -> endpoint) + val origBroker = new Broker(1, listEndPoints) + val brokerBytes = ByteBuffer.allocate(origBroker.sizeInBytes) + + origBroker.writeTo(brokerBytes) + + val newBroker = Broker.readFrom(brokerBytes.flip().asInstanceOf[ByteBuffer]) + assert(origBroker == newBroker) + } + + @Test + def testHashAndEquals() = { + val endpoint1 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) + val endpoint2 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) + val endpoint3 = new EndPoint("myhost", 1111, SecurityProtocol.PLAINTEXT) + val endpoint4 = new EndPoint("other", 1111, SecurityProtocol.PLAINTEXT) + val broker1 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint1)) + val broker2 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint2)) + val broker3 = new Broker(2, Map(SecurityProtocol.PLAINTEXT -> endpoint3)) + val broker4 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint4)) + + assert(broker1 == broker2) + assert(broker1 != broker3) + assert(broker1 != broker4) + assert(broker1.hashCode() == broker2.hashCode()) + assert(broker1.hashCode() != broker3.hashCode()) + assert(broker1.hashCode() != broker4.hashCode()) + + val hashmap = new mutable.HashMap[Broker, Int]() + hashmap.put(broker1, 1) + assert(hashmap.getOrElse(broker1, -1) == 1) + } + + @Test + def testFromJSON() = { + val brokerInfoStr = "{\"version\":2," + + "\"host\":\"localhost\"," + + "\"port\":9092," + + "\"jmx_port\":9999," + + "\"timestamp\":\"1416974968782\"," + + "\"endpoints\":\"PLAINTEXT://localhost:9092\"}" + val broker = Broker.createBroker(1, brokerInfoStr) + assert(broker.id == 1) + assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host == "localhost") + assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port == 9092) + } + + @Test + def testFromOldJSON() = { + val brokerInfoStr = "{\"jmx_port\":-1,\"timestamp\":\"1420485325400\",\"host\":\"172.16.8.243\",\"version\":1,\"port\":9091}" + val broker = Broker.createBroker(1, brokerInfoStr) + assert(broker.id == 1) + assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host == "172.16.8.243") + assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port == 9091) + } + + @Test + def testBrokerEndpointFromURI() = { + var connectionString = "localhost:9092" + var endpoint = BrokerEndpoint.createBrokerEndPoint(1, connectionString) + assert(endpoint.host == "localhost") + assert(endpoint.port == 9092) + // also test for ipv6 + connectionString = "[::1]:9092" + endpoint = BrokerEndpoint.createBrokerEndPoint(1, connectionString) + assert(endpoint.host == "::1") + assert(endpoint.port == 9092) + } + + @Test + def testEndpointFromURI() = { + var connectionString = "PLAINTEXT://localhost:9092" + var endpoint = EndPoint.createEndPoint(connectionString) + assert(endpoint.host == "localhost") + assert(endpoint.port == 9092) + assert(endpoint.toString == "PLAINTEXT://localhost:9092") + // also test for default bind + connectionString = "PLAINTEXT://:9092" + endpoint = EndPoint.createEndPoint(connectionString) + assert(endpoint.host == null) + assert(endpoint.port == 9092) + assert(endpoint.toString == "PLAINTEXT://[null]:9092") + // also test for ipv6 + connectionString = "PLAINTEXT://[::1]:9092" + endpoint = EndPoint.createEndPoint(connectionString) + assert(endpoint.host == "::1") + assert(endpoint.port == 9092) + assert(endpoint.toString == "PLAINTEXT://[::1]:9092") + } + + + + + + +} diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index c0355cc..3edfc75 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -46,7 +46,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { val group = "group1" val consumer0 = "consumer0" val consumedOffset = 5 - val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port))) + val cluster = new Cluster(configs.map(c => new Broker(c.brokerId,c.listeners))) val queue = new LinkedBlockingQueue[FetchedDataChunk] val topicInfos = configs.map(c => new PartitionTopicInfo(topic, 0, diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 25845ab..7ac76e4 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -29,7 +29,7 @@ import kafka.consumer._ import kafka.serializer._ import kafka.producer.{KeyedMessage, Producer} import kafka.utils.TestUtils._ -import kafka.utils.TestUtils +import kafka.utils.{Utils, TestUtils} class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { @@ -39,7 +39,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { yield new KafkaConfig(props) val messages = new mutable.HashMap[Int, Seq[Array[Byte]]] val topic = "topic" - val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port))) + val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.listeners))) val shutdown = ZookeeperConsumerConnector.shutdownCommand val queue = new LinkedBlockingQueue[FetchedDataChunk] val topicInfos = configs.map(c => new PartitionTopicInfo(topic, diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index dc0512b..d2b27b5 100644 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -18,6 +18,8 @@ package kafka.integration import java.util.Arrays +import kafka.cluster.SecurityProtocol + import scala.collection.mutable.Buffer import kafka.server._ import kafka.utils.{Utils, TestUtils} @@ -38,7 +40,10 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { def serverForId(id: Int) = servers.find(s => s.config.brokerId == id) - def bootstrapUrl = configs.map(c => c.hostName + ":" + c.port).mkString(",") + def bootstrapUrl = configs.map(c => { + val endpoint = c.listeners(SecurityProtocol.PLAINTEXT) + endpoint.host + ":" + endpoint.port + }).mkString(",") override def setUp() { super.setUp diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 35dc071..91c282b 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -22,8 +22,8 @@ import kafka.zk.ZooKeeperTestHarness import kafka.admin.AdminUtils import java.nio.ByteBuffer import junit.framework.Assert._ -import kafka.cluster.Broker -import kafka.utils.TestUtils +import kafka.cluster.{SecurityProtocol, Broker} +import kafka.utils.{Utils, TestUtils} import kafka.utils.TestUtils._ import kafka.server.{KafkaServer, KafkaConfig} import kafka.api.TopicMetadataRequest @@ -34,7 +34,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(1) val configs = props.map(p => new KafkaConfig(p)) private var server1: KafkaServer = null - val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port)) + val brokerEndPoints = configs.map(c => new Broker(c.brokerId,c.listeners).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)) override def setUp() { super.setUp() @@ -67,7 +67,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val topic = "test" createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) - var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", + var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", 2000,0).topicsMetadata assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode) @@ -87,7 +87,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) // issue metadata request with empty list of topics - var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokers, "TopicMetadataTest-testGetAllTopicMetadata", + var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testGetAllTopicMetadata", 2000, 0).topicsMetadata assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) assertEquals(2, topicsMetadata.size) @@ -106,7 +106,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { def testAutoCreateTopic { // auto create topic val topic = "testAutoCreateTopic" - var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testAutoCreateTopic", + var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic", 2000,0).topicsMetadata assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode) assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size) @@ -118,7 +118,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0) // retry the metadata for the auto created topic - topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", + topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", 2000,0).topicsMetadata assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index c2dd8eb..9b1df44 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -37,7 +37,7 @@ class LogTest extends JUnitSuite { @Before def setUp() { logDir = TestUtils.tempDir() - val props = TestUtils.createBrokerConfig(0, -1) + val props = TestUtils.createBrokerConfig(0, 1) config = new KafkaConfig(props) } diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 0af23ab..f5e7168 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -19,6 +19,8 @@ package kafka.network; import java.net._ import java.io._ +import kafka.cluster.SecurityProtocol.SecurityProtocol +import kafka.cluster.{SecurityProtocol, EndPoint} import org.junit._ import org.scalatest.junit.JUnitSuite import java.util.Random @@ -31,12 +33,17 @@ import kafka.message.ByteBufferMessageSet import java.nio.channels.SelectionKey import kafka.utils.TestUtils import scala.collection.Map +import scala.collection.JavaConversions._ class SocketServerTest extends JUnitSuite { + val ports = kafka.utils.TestUtils.choosePorts(2) + val plaintextPort = ports.head + val tracePort = ports.last + val server: SocketServer = new SocketServer(0, - host = null, - port = kafka.utils.TestUtils.choosePort, + Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, plaintextPort, SecurityProtocol.PLAINTEXT), + SecurityProtocol.TRACE -> EndPoint(null, tracePort, SecurityProtocol.TRACE)), numProcessorThreads = 1, maxQueuedRequests = 50, sendBufferSize = 300000, @@ -73,7 +80,10 @@ class SocketServerTest extends JUnitSuite { channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) } - def connect(s:SocketServer = server) = new Socket("localhost", s.port) + + def connect(s:SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = { + new Socket("localhost", server.endpoints.get(protocol).get.port) + } @After def cleanup() { @@ -81,7 +91,8 @@ class SocketServerTest extends JUnitSuite { } @Test def simpleRequest() { - val socket = connect() + val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) + val traceSocket = connect(protocol = SecurityProtocol.TRACE) val correlationId = -1 val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs @@ -95,9 +106,15 @@ class SocketServerTest extends JUnitSuite { val serializedBytes = new Array[Byte](byteBuffer.remaining) byteBuffer.get(serializedBytes) - sendRequest(socket, 0, serializedBytes) + // Test PLAINTEXT socket + sendRequest(plainSocket, 0, serializedBytes) processRequest(server.requestChannel) - assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq) + assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq) + + // Test TRACE socket + sendRequest(traceSocket, 0, serializedBytes) + processRequest(server.requestChannel) + assertEquals(serializedBytes.toSeq, receiveResponse(traceSocket).toSeq) } @Test(expected = classOf[IOException]) @@ -129,21 +146,40 @@ class SocketServerTest extends JUnitSuite { "Socket key should be available for reads") } - @Test(expected = classOf[IOException]) + @Test def testSocketsCloseOnShutdown() { // open a connection - val socket = connect() + val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) + val traceSocket = connect(protocol = SecurityProtocol.TRACE) val bytes = new Array[Byte](40) // send a request first to make sure the connection has been picked up by the socket server - sendRequest(socket, 0, bytes) + sendRequest(plainSocket, 0, bytes) + sendRequest(traceSocket, 0, bytes) processRequest(server.requestChannel) + + // make sure the sockets are open + server.acceptors.values().map(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed)) // then shutdown the server server.shutdown() val largeChunkOfBytes = new Array[Byte](1000000) // doing a subsequent send should throw an exception as the connection should be closed. // send a large chunk of bytes to trigger a socket flush - sendRequest(socket, 0, largeChunkOfBytes) + try { + sendRequest(plainSocket, 0, largeChunkOfBytes) + fail("expected exception when writing to closed plain socket") + } catch { + case e: IOException => // expected + } + + try { + sendRequest(traceSocket, 0, largeChunkOfBytes) + fail("expected exception when writing to closed trace socket") + } catch { + case e: IOException => // expected + } + + } @Test @@ -161,8 +197,7 @@ class SocketServerTest extends JUnitSuite { val overrideNum = 6 val overrides: Map[String, Int] = Map("localhost" -> overrideNum) val overrideServer: SocketServer = new SocketServer(0, - host = null, - port = kafka.utils.TestUtils.choosePort, + Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, kafka.utils.TestUtils.choosePort, SecurityProtocol.PLAINTEXT)), numProcessorThreads = 1, maxQueuedRequests = 50, sendBufferSize = 300000, @@ -180,4 +215,5 @@ class SocketServerTest extends JUnitSuite { assertEquals(-1, conn.getInputStream.read()) overrideServer.shutdown() } + } diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 1db6ac3..450aa59 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -23,7 +23,7 @@ import junit.framework.Assert._ import org.easymock.EasyMock import org.junit.Test import kafka.api._ -import kafka.cluster.Broker +import kafka.cluster.{BrokerEndpoint, Broker} import kafka.common._ import kafka.message._ import kafka.producer.async._ @@ -163,8 +163,8 @@ class AsyncProducerTest extends JUnit3Suite { val props = new Properties() props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - val broker1 = new Broker(0, "localhost", 9092) - val broker2 = new Broker(1, "localhost", 9093) + val broker1 = new BrokerEndpoint(0, "localhost", 9092) + val broker2 = new BrokerEndpoint(1, "localhost", 9093) // form expected partitions metadata val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2)) @@ -467,7 +467,7 @@ class AsyncProducerTest extends JUnit3Suite { } private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = { - val broker1 = new Broker(brokerId, brokerHost, brokerPort) + val broker1 = new BrokerEndpoint(brokerId, brokerHost, brokerPort) new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1)))) } diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index d60d8e0..fe456e6 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -21,6 +21,7 @@ import java.net.SocketTimeoutException import java.util.Properties import junit.framework.Assert import kafka.admin.AdminUtils +import kafka.cluster.SecurityProtocol import kafka.integration.KafkaServerTestHarness import kafka.message._ import kafka.server.KafkaConfig @@ -39,7 +40,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testReachableServer() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val port = server.socketServer.endpoints.get(SecurityProtocol.PLAINTEXT).get.port + val props = TestUtils.getSyncProducerConfig(port) val producer = new SyncProducer(new SyncProducerConfig(props)) val firstStart = SystemTime.milliseconds @@ -74,7 +76,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testEmptyProduceRequest() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val port = server.socketServer.endpoints(SecurityProtocol.PLAINTEXT).port + val props = TestUtils.getSyncProducerConfig(port) val correlationId = 0 val clientId = SyncProducerConfig.DefaultClientId @@ -91,7 +94,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testMessageSizeTooLarge() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val port = server.socketServer.endpoints.get(SecurityProtocol.PLAINTEXT).get.port + val props = TestUtils.getSyncProducerConfig(port) val producer = new SyncProducer(new SyncProducerConfig(props)) TestUtils.createTopic(zkClient, "test", numPartitions = 1, replicationFactor = 1, servers = servers) @@ -118,8 +122,9 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testMessageSizeTooLargeWithAckZero() { val server = servers.head + val port = server.socketServer.endpoints.get(SecurityProtocol.PLAINTEXT).get.port + val props = TestUtils.getSyncProducerConfig(port) - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) props.put("request.required.acks", "0") val producer = new SyncProducer(new SyncProducerConfig(props)) @@ -145,7 +150,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testProduceCorrectlyReceivesResponse() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val port = server.socketServer.endpoints.get(SecurityProtocol.PLAINTEXT).get.port + val props = TestUtils.getSyncProducerConfig(port) val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) @@ -191,7 +197,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val timeoutMs = 500 val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val port = server.socketServer.endpoints.get(SecurityProtocol.PLAINTEXT).get.port + val props = TestUtils.getSyncProducerConfig(port) val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) @@ -217,7 +224,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testProduceRequestWithNoResponse() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val port = server.socketServer.endpoints.get(SecurityProtocol.PLAINTEXT).get.port + val props = TestUtils.getSyncProducerConfig(port) val correlationId = 0 val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs @@ -232,8 +240,9 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { def testNotEnoughReplicas() { val topicName = "minisrtest" val server = servers.head + val port = server.socketServer.endpoints.get(SecurityProtocol.PLAINTEXT).get.port + val props = TestUtils.getSyncProducerConfig(port) - val props = TestUtils.getSyncProducerConfig(server.socketServer.port) props.put("request.required.acks", "-1") val producer = new SyncProducer(new SyncProducerConfig(props)) diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala index f0c4a56..d56503f 100644 --- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala @@ -17,6 +17,7 @@ package kafka.server +import kafka.cluster.SecurityProtocol import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import junit.framework.Assert._ @@ -31,9 +32,8 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness { override def setUp() { super.setUp() val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort()) - props.put("advertised.host.name", advertisedHostName) - props.put("advertised.port", advertisedPort.toString) - + props.put("advertised.listeners", SecurityProtocol.PLAINTEXT.toString+"://"+advertisedHostName+":"+advertisedPort.toString) + server = TestUtils.createServer(new KafkaConfig(props)) } @@ -45,8 +45,9 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness { def testBrokerAdvertiseToZK { val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId) - assertEquals(advertisedHostName, brokerInfo.get.host) - assertEquals(advertisedPort, brokerInfo.get.port) + val endpoint = brokerInfo.get.endPoints.get(SecurityProtocol.PLAINTEXT).get + assertEquals(advertisedHostName, endpoint.host) + assertEquals(advertisedPort, endpoint.port) } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 82dce80..1f93491 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -17,12 +17,13 @@ package kafka.server +import kafka.cluster.SecurityProtocol import org.junit.Test import junit.framework.Assert._ import org.scalatest.junit.JUnit3Suite -import kafka.utils.TestUtils import kafka.message.GZIPCompressionCodec import kafka.message.NoCompressionCodec +import kafka.utils.{Utils, TestUtils} class KafkaConfigTest extends JUnit3Suite { @@ -93,12 +94,13 @@ class KafkaConfigTest extends JUnit3Suite { val hostName = "fake-host" val props = TestUtils.createBrokerConfig(0, port) - props.put("host.name", hostName) + props.put("listeners", "PLAINTEXT://"+hostName+":"+port) val serverConfig = new KafkaConfig(props) - - assertEquals(serverConfig.advertisedHostName, hostName) - assertEquals(serverConfig.advertisedPort, port) + val endpoints = serverConfig.advertisedListeners + val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get + assertEquals(endpoint.host, hostName) + assertEquals(endpoint.port, port) } @Test @@ -108,13 +110,14 @@ class KafkaConfigTest extends JUnit3Suite { val advertisedPort = 1234 val props = TestUtils.createBrokerConfig(0, port) - props.put("advertised.host.name", advertisedHostName) - props.put("advertised.port", advertisedPort.toString) + props.put("advertised.listeners", "PLAINTEXT://"+advertisedHostName+":"+advertisedPort.toString) val serverConfig = new KafkaConfig(props) + val endpoints = serverConfig.advertisedListeners + val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get - assertEquals(serverConfig.advertisedHostName, advertisedHostName) - assertEquals(serverConfig.advertisedPort, advertisedPort) + assertEquals(endpoint.host, advertisedHostName) + assertEquals(endpoint.port, advertisedPort) } @Test diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index c2ba07c..01eefc0 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -23,7 +23,7 @@ import kafka.utils.TestUtils._ import junit.framework.Assert._ import kafka.utils.{ZkUtils, Utils, TestUtils} import kafka.controller.{ControllerContext, LeaderIsrAndControllerEpoch, ControllerChannelManager} -import kafka.cluster.Broker +import kafka.cluster.{SecurityProtocol, Broker} import kafka.common.ErrorMapping import kafka.api._ @@ -118,7 +118,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // start another controller val controllerId = 2 val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort())) - val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port)) + val brokers = servers.map(s => new Broker(s.config.brokerId, s.config.listeners)) + val brokerEndPoints = brokers.map(b => b.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)) val controllerContext = new ControllerContext(zkClient, 6000) controllerContext.liveBrokers = brokers.toSet val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig) @@ -128,7 +129,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { leaderAndIsr.put((topic, partitionId), new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2)) val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, Set(0,1))).toMap - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId, + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokerEndPoints.toSet, controllerId, staleControllerEpoch, 0, "") controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback) diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index c06ee75..0476a59 100644 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -197,7 +197,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { private def createBrokerConfig(nodeId: Int, port: Int): Properties = { val props = new Properties props.put("broker.id", nodeId.toString) - props.put("port", port.toString) + props.put("listeners", "PLAINTEXT://localhost:" + port.toString) props.put("log.dir", getLogDir.getAbsolutePath) props.put("log.flush.interval.messages", "1") props.put("enable.zookeeper", "false") diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 6ce1807..ec34b5f 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -34,7 +34,7 @@ import kafka.server._ import kafka.producer._ import kafka.message._ import kafka.api._ -import kafka.cluster.Broker +import kafka.cluster.{SecurityProtocol, Broker} import kafka.consumer.{KafkaStream, ConsumerConfig} import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.common.TopicAndPartition @@ -147,7 +147,10 @@ object TestUtils extends Logging { } def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = { - configs.map(c => formatAddress(c.hostName, c.port)).mkString(",") + configs.map(c => { + val endpoint = c.listeners.get(SecurityProtocol.PLAINTEXT).get + formatAddress(endpoint.host, endpoint.port) + }).mkString(",") } /** @@ -158,8 +161,7 @@ object TestUtils extends Logging { enableDeleteTopic: Boolean = false): Properties = { val props = new Properties if (nodeId >= 0) props.put("broker.id", nodeId.toString) - props.put("host.name", "localhost") - props.put("port", port.toString) + props.put("listeners", "PLAINTEXT://localhost:"+port.toString) props.put("log.dir", TestUtils.tempDir().getAbsolutePath) props.put("zookeeper.connect", TestZKUtils.zookeeperConnect) props.put("replica.socket.timeout.ms", "1500") @@ -451,13 +453,13 @@ object TestUtils extends Logging { } def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = { - val brokers = ids.map(id => new Broker(id, "localhost", 6667)) - brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port, 6000, jmxPort = -1)) + val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT)) + brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, "localhost", 6667, b.endPoints, 6000, jmxPort = -1)) brokers } def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = { - val brokers = ids.map(id => new Broker(id, "localhost", 6667)) + val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT)) brokers.foreach(b => ZkUtils.deletePath(zkClient, ZkUtils.BrokerIdsPath + "/" + b)) brokers } diff --git a/system_test/replication_testsuite/testcase_1/testcase_1_properties.json b/system_test/replication_testsuite/testcase_1/testcase_1_properties.json index 0c6d7a3..680213f 100644 --- a/system_test/replication_testsuite/testcase_1/testcase_1_properties.json +++ b/system_test/replication_testsuite/testcase_1/testcase_1_properties.json @@ -32,7 +32,7 @@ "entity_id": "1", "port": "9091", "broker.id": "1", - "log.segment.bytes": "20480", + "log.segment.bytes": "10000000", "log.dir": "/tmp/kafka_server_1_logs", "log_filename": "kafka_server_9091.log", "config_filename": "kafka_server_9091.properties" @@ -41,7 +41,7 @@ "entity_id": "2", "port": "9092", "broker.id": "2", - "log.segment.bytes": "20480", + "log.segment.bytes": "10000000", "log.dir": "/tmp/kafka_server_2_logs", "log_filename": "kafka_server_9092.log", "config_filename": "kafka_server_9092.properties" @@ -50,7 +50,7 @@ "entity_id": "3", "port": "9093", "broker.id": "3", - "log.segment.bytes": "20480", + "log.segment.bytes": "10000000", "log.dir": "/tmp/kafka_server_3_logs", "log_filename": "kafka_server_9093.log", "config_filename": "kafka_server_9093.properties" diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 41d511c..a9b73f7 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -436,6 +436,7 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" addedCSVConfig["kafka.csv.metrics.reporter.enabled"] = "true" + addedCSVConfig["listeners"] = "PLAINTEXT://localhost:"+tcCfg["port"] if brokerVersion == "0.7": addedCSVConfig["brokerid"] = tcCfg["brokerid"]