diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index ee3dff5..5239538 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -33,6 +33,7 @@ object OffsetRequest { def readFrom(buffer: ByteBuffer): OffsetRequest = { val versionId = buffer.getShort + val correlationId = buffer.getInt val clientId = readShortString(buffer) val replicaId = buffer.getInt val topicCount = buffer.getInt @@ -54,16 +55,18 @@ case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int) case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], versionId: Short = OffsetRequest.CurrentVersion, + correlationId: Int = 0, clientId: String = OffsetRequest.DefaultClientId, replicaId: Int = Request.OrdinaryConsumerId) extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) { - def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, replicaId) + def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], correlationId: Int, replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, correlationId, OffsetRequest.DefaultClientId, replicaId) lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) + buffer.putInt(correlationId) writeShortString(buffer, clientId) buffer.putInt(replicaId) @@ -83,6 +86,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ def sizeInBytes = 2 + /* versionId */ + 4 + /* correlationId */ shortStringLength(clientId) + 4 + /* replicaId */ 4 + /* topic count */ diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index e2d03e8..1e50f68 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -21,7 +21,7 @@ import kafka.cluster.Broker import java.nio.ByteBuffer import kafka.api.ApiUtils._ import kafka.utils.Logging -import collection.mutable.ListBuffer +import collection.mutable.ArrayBuffer import kafka.common.{KafkaException, ErrorMapping} /** @@ -31,11 +31,11 @@ import kafka.common.{KafkaException, ErrorMapping} * partition id (4 bytes) * * does leader exist (1 byte) - * leader info (4 + creator.length + host.length + 4 (port) + 4 (id)) + * leader info (4 + host.length + 4 (port) + 4 (id)) * number of replicas (2 bytes) - * replica info (4 + creator.length + host.length + 4 (port) + 4 (id)) + * replica info (4 + host.length + 4 (port) + 4 (id)) * number of in sync replicas (2 bytes) - * replica info (4 + creator.length + host.length + 4 (port) + 4 (id)) + * replica info (4 + host.length + 4 (port) + 4 (id)) * * does log metadata exist (1 byte) * number of log segments (4 bytes) @@ -48,30 +48,26 @@ import kafka.common.{KafkaException, ErrorMapping} * */ -sealed trait LeaderRequest { def requestId: Byte } -case object LeaderExists extends LeaderRequest { val requestId: Byte = 1 } -case object LeaderDoesNotExist extends LeaderRequest { val requestId: Byte = 0 } - object TopicMetadata { + + val NoLeaderNodeId = -1 - def readFrom(buffer: ByteBuffer): TopicMetadata = { + def readFrom(buffer: ByteBuffer, brokers: Map[Int, Broker]): TopicMetadata = { val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) val topic = readShortString(buffer) val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue)) - val partitionsMetadata = new ListBuffer[PartitionMetadata]() + val partitionsMetadata = new ArrayBuffer[PartitionMetadata]() for(i <- 0 until numPartitions) - partitionsMetadata += PartitionMetadata.readFrom(buffer) + partitionsMetadata += PartitionMetadata.readFrom(buffer, brokers) new TopicMetadata(topic, partitionsMetadata, errorCode) } } case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) extends Logging { def sizeInBytes: Int = { - var size: Int = 2 /* error code */ - size += shortStringLength(topic) - size += partitionsMetadata.foldLeft(4 /* number of partitions */)(_ + _.sizeInBytes) - debug("Size of topic metadata = " + size) - size + 2 /* error code */ + + shortStringLength(topic) + + 4 + partitionsMetadata.map(_.sizeInBytes).sum /* size and partition data array */ } def writeTo(buffer: ByteBuffer) { @@ -87,40 +83,24 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat object PartitionMetadata { - def readFrom(buffer: ByteBuffer): PartitionMetadata = { + def readFrom(buffer: ByteBuffer, brokers: Map[Int, Broker]): PartitionMetadata = { val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */ - val doesLeaderExist = getLeaderRequest(buffer.get) - val leader = doesLeaderExist match { - case LeaderExists => /* leader exists */ - Some(Broker.readFrom(buffer)) - case LeaderDoesNotExist => None - } + val leaderId = buffer.getInt + val leader = brokers.get(leaderId) /* list of all replicas */ - val numReplicas = readShortInRange(buffer, "number of all replicas", (0, Short.MaxValue)) - val replicas = new Array[Broker](numReplicas) - for(i <- 0 until numReplicas) { - replicas(i) = Broker.readFrom(buffer) - } + val numReplicas = readIntInRange(buffer, "number of all replicas", (0, Int.MaxValue)) + val replicaIds = (0 until numReplicas).map(_ => buffer.getInt) + val replicas = replicaIds.map(brokers) /* list of in-sync replicas */ - val numIsr = readShortInRange(buffer, "number of in-sync replicas", (0, Short.MaxValue)) - val isr = new Array[Broker](numIsr) - for(i <- 0 until numIsr) { - isr(i) = Broker.readFrom(buffer) - } + val numIsr = readIntInRange(buffer, "number of in-sync replicas", (0, Int.MaxValue)) + val isrIds = (0 until numIsr).map(_ => buffer.getInt) + val isr = isrIds.map(brokers) new PartitionMetadata(partitionId, leader, replicas, isr, errorCode) } - - private def getLeaderRequest(requestId: Byte): LeaderRequest = { - requestId match { - case LeaderExists.requestId => LeaderExists - case LeaderDoesNotExist.requestId => LeaderDoesNotExist - case _ => throw new KafkaException("Unknown leader request id " + requestId) - } - } } case class PartitionMetadata(partitionId: Int, @@ -129,42 +109,28 @@ case class PartitionMetadata(partitionId: Int, isr: Seq[Broker] = Seq.empty, errorCode: Short = ErrorMapping.NoError) extends Logging { def sizeInBytes: Int = { - var size: Int = 2 /* error code */ + 4 /* partition id */ + 1 /* if leader exists*/ - - leader match { - case Some(l) => size += l.sizeInBytes - case None => - } - - size += 2 /* number of replicas */ - size += replicas.foldLeft(0)(_ + _.sizeInBytes) - size += 2 /* number of in sync replicas */ - size += isr.foldLeft(0)(_ + _.sizeInBytes) - - debug("Size of partition metadata = " + size) - size + 2 /* error code */ + + 4 /* partition id */ + + 4 /* leader */ + + 4 + 4 * replicas.size /* replica array */ + + 4 + 4 * isr.size /* isr array */ } def writeTo(buffer: ByteBuffer) { buffer.putShort(errorCode) buffer.putInt(partitionId) - /* if leader exists*/ - leader match { - case Some(l) => - buffer.put(LeaderExists.requestId) - /* leader id host_name port */ - l.writeTo(buffer) - case None => buffer.put(LeaderDoesNotExist.requestId) - } + /* leader */ + val leaderId = if(leader.isDefined) leader.get.id else TopicMetadata.NoLeaderNodeId + buffer.putInt(leaderId) /* number of replicas */ - buffer.putShort(replicas.size.toShort) - replicas.foreach(r => r.writeTo(buffer)) + buffer.putInt(replicas.size) + replicas.foreach(r => buffer.putInt(r.id)) /* number of in-sync replicas */ - buffer.putShort(isr.size.toShort) - isr.foreach(r => r.writeTo(buffer)) + buffer.putInt(isr.size) + isr.foreach(r => buffer.putInt(r.id)) } } diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 70c42e3..17d272f 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -33,6 +33,7 @@ object TopicMetadataRequest extends Logging { def readFrom(buffer: ByteBuffer): TopicMetadataRequest = { val versionId = buffer.getShort + val correlationId = buffer.getInt val clientId = readShortString(buffer) val numTopics = readIntInRange(buffer, "number of topics", (0, Int.MaxValue)) val topics = new ListBuffer[String]() @@ -40,26 +41,28 @@ object TopicMetadataRequest extends Logging { topics += readShortString(buffer) val topicsList = topics.toList debug("topic = %s".format(topicsList.head)) - new TopicMetadataRequest(versionId, clientId, topics.toList) + new TopicMetadataRequest(versionId, clientId, topics.toList, correlationId) } } case class TopicMetadataRequest(val versionId: Short, val clientId: String, - val topics: Seq[String]) + val topics: Seq[String], + val correlationId: Int) extends RequestOrResponse(Some(RequestKeys.MetadataKey)){ def this(topics: Seq[String]) = - this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics) + this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, 0) def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) + buffer.putInt(correlationId) // correlation id not set yet writeShortString(buffer, clientId) buffer.putInt(topics.size) topics.foreach(topic => writeShortString(buffer, topic)) } def sizeInBytes(): Int = { - 2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ + 2 + 4 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ } } diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala index 25068d1..902285b 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala @@ -17,30 +17,46 @@ package kafka.api +import kafka.cluster.Broker import java.nio.ByteBuffer object TopicMetadataResponse { def readFrom(buffer: ByteBuffer): TopicMetadataResponse = { val versionId = buffer.getShort + val correlationId = buffer.getInt + val brokerCount = buffer.getInt + val brokers = (0 until brokerCount).map(_ => Broker.readFrom(buffer)) + val brokerMap = brokers.map(b => (b.id, b)).toMap val topicCount = buffer.getInt - val topicsMetadata = new Array[TopicMetadata](topicCount) - for( i <- 0 until topicCount) { - topicsMetadata(i) = TopicMetadata.readFrom(buffer) - } - new TopicMetadataResponse(versionId, topicsMetadata.toSeq) + val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer, brokerMap)) + new TopicMetadataResponse(versionId, topicsMetadata, correlationId) } } case class TopicMetadataResponse(versionId: Short, - topicsMetadata: Seq[TopicMetadata]) extends RequestOrResponse -{ - val sizeInBytes = 2 + topicsMetadata.foldLeft(4)(_ + _.sizeInBytes) + topicsMetadata: Seq[TopicMetadata], + correlationId: Int) extends RequestOrResponse { + val sizeInBytes: Int = { + val brokers = extractBrokers(topicsMetadata).values + 2 + 4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum + } def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) + buffer.putInt(correlationId) /* correlation id not set, yet */ + /* brokers */ + val brokers = extractBrokers(topicsMetadata).values + buffer.putInt(brokers.size) + brokers.foreach(_.writeTo(buffer)) /* topic metadata */ buffer.putInt(topicsMetadata.length) topicsMetadata.foreach(_.writeTo(buffer)) } + + def extractBrokers(topicMetadatas: Seq[TopicMetadata]): Map[Int, Broker] = { + val parts = topicsMetadata.flatMap(_.partitionsMetadata) + val brokers = parts.flatMap(_.replicas) ++ parts.map(_.leader).collect{case Some(l) => l} + brokers.map(b => (b.id, b)).toMap + } } diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index aeead2d..0195312 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -1,6 +1,7 @@ package kafka.client import scala.collection._ +import java.net.InetSocketAddress import kafka.cluster._ import kafka.api._ import kafka.producer._ @@ -12,14 +13,15 @@ import kafka.utils.{Utils, Logging} */ object ClientUtils extends Logging{ - def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse = { + def fetchTopicMetadata(topics: Set[String], brokers: Seq[InetSocketAddress]): TopicMetadataResponse = { var fetchMetaDataSucceeded: Boolean = false var i: Int = 0 val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq) var topicMetadataResponse: TopicMetadataResponse = null var t: Throwable = null while(i < brokers.size && !fetchMetaDataSucceeded) { - val producer: SyncProducer = ProducerPool.createSyncProducer(None, brokers(i)) + val address = brokers(i) + val producer: SyncProducer = ProducerPool.createSyncProducer(None, address.getHostName, address.getPort) info("Fetching metadata for topic %s".format(topics)) try { topicMetadataResponse = producer.send(topicMetadataRequest) @@ -43,18 +45,13 @@ object ClientUtils extends Logging{ /** * Parse a list of broker urls in the form host1:port1, host2:port2, ... */ - def parseBrokerList(brokerListStr: String): Seq[Broker] = { - val brokersStr = Utils.parseCsvList(brokerListStr) - - brokersStr.zipWithIndex.map(b =>{ - val brokerStr = b._1 - val brokerId = b._2 - val brokerInfos = brokerStr.split(":") - val hostName = brokerInfos(0) - val port = brokerInfos(1).toInt - val creatorId = hostName + "-" + System.currentTimeMillis() - new Broker(brokerId, creatorId, hostName, port) - }) + def parseBrokerList(brokerListStr: String): Seq[InetSocketAddress] = { + Utils.parseCsvList(brokerListStr).map{url => + val pieces = url.split(":") + if(pieces == null || pieces.length != 2) + throw new IllegalArgumentException("Malformed URL: " + url) + new InetSocketAddress(pieces(0), pieces(1).toInt) + } } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 03a75f0..7c09004 100644 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -31,33 +31,31 @@ private[kafka] object Broker { if(brokerInfoString == null) throw new BrokerNotAvailableException("Broker id %s does not exist".format(id)) val brokerInfo = brokerInfoString.split(":") - new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt) + new Broker(id, brokerInfo(0), brokerInfo(1).toInt) } def readFrom(buffer: ByteBuffer): Broker = { val id = buffer.getInt - val creatorId = readShortString(buffer) val host = readShortString(buffer) val port = buffer.getInt - new Broker(id, creatorId, host, port) + new Broker(id, host, port) } } -private[kafka] case class Broker(val id: Int, val creatorId: String, val host: String, val port: Int) { +private[kafka] case class Broker(val id: Int, val host: String, val port: Int) { - override def toString(): String = new String("id:" + id + ",creatorId:" + creatorId + ",host:" + host + ",port:" + port) + override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port) - def getZKString(): String = new String(creatorId + ":" + host + ":" + port) + def getZKString(): String = new String(host + ":" + port) def writeTo(buffer: ByteBuffer) { buffer.putInt(id) - writeShortString(buffer, creatorId) writeShortString(buffer, host) buffer.putInt(port) } def sizeInBytes: Int = { - val size = shortStringLength(creatorId) + shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/ + val size = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/ debug("Size of broker info = " + size) size } diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index bdc020b..476ab5a 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -23,6 +23,7 @@ import kafka.cluster.{Cluster, Broker} import scala.collection.immutable import collection.mutable.HashMap import scala.collection.mutable +import java.net.InetSocketAddress import java.util.concurrent.locks.ReentrantLock import kafka.utils.ZkUtils._ import kafka.utils.{ShutdownableThread, SystemTime} @@ -53,7 +54,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, try { trace("Partitions without leader %s".format(noLeaderPartitionSet)) - val brokers = getAllBrokersInCluster(zkClient) + val brokers = getAllBrokersInCluster(zkClient).map(b => new InetSocketAddress(b.host, b.port)) val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker] topicsMetadata.foreach( diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index d642a67..efb2fb5 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -30,7 +30,10 @@ import kafka.cluster.Broker object SimpleConsumer extends Logging { - def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long, + def earliestOrLatestOffset(broker: Broker, + topic: String, + partitionId: Int, + earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean): Long = { var simpleConsumer: SimpleConsumer = null var producedOffset: Long = -1L @@ -42,7 +45,7 @@ object SimpleConsumer extends Logging { new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))) else new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), - Request.DebuggingConsumerId) + 0, Request.DebuggingConsumerId) producedOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head } catch { case e => diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 227c90d..0a24e8f 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -19,6 +19,7 @@ package kafka.consumer import java.util.concurrent._ import java.util.concurrent.atomic._ +import java.net.InetSocketAddress import locks.ReentrantLock import scala.collection._ import kafka.cluster._ @@ -398,7 +399,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def rebalance(cluster: Cluster): Boolean = { val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic val consumersPerTopicMap = getConsumersPerTopic(zkClient, group) - val brokers = getAllBrokersInCluster(zkClient) + val brokers = getAllBrokersInCluster(zkClient).map(b => new InetSocketAddress(b.host, b.port)) val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int] diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index ad98b75..dbf04fd 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -20,14 +20,15 @@ import kafka.api._ import java.nio.ByteBuffer import scala.collection.JavaConversions -class TopicMetadataRequest(val versionId: Short, +class TopicMetadataRequest(val correlationId: Int, + val versionId: Short, val clientId: String, val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) { val underlying: kafka.api.TopicMetadataRequest = - new kafka.api.TopicMetadataRequest(versionId, clientId, JavaConversions.asBuffer(topics)) + new kafka.api.TopicMetadataRequest(versionId, clientId, JavaConversions.asBuffer(topics), correlationId) def this(topics: java.util.List[String]) = - this(kafka.api.TopicMetadataRequest.CurrentVersion, kafka.api.TopicMetadataRequest.DefaultClientId, topics) + this(0, kafka.api.TopicMetadataRequest.CurrentVersion, kafka.api.TopicMetadataRequest.DefaultClientId, topics) def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer) diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala index 2c9f2d1..30b1dc3 100644 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -33,7 +33,7 @@ class ProducerConfig private (val props: VerifiableProperties) /** This is for bootstrapping and the producer will only use it for getting metadata * (topics, partitions and replicas). The socket connections for sending the actual data * will be established based on the broker information returned in the metadata. The - * format is host1:por1,host2:port2, and the list can be a subset of brokers or + * format is host1:port1,host2:port2, and the list can be a subset of brokers or * a VIP pointing to a subset of brokers. */ val brokerList = props.getString("broker.list") diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala index eb8ead3..64a85ef 100644 --- a/core/src/main/scala/kafka/producer/ProducerPool.scala +++ b/core/src/main/scala/kafka/producer/ProducerPool.scala @@ -20,6 +20,7 @@ package kafka.producer import kafka.cluster.Broker import java.util.Properties import collection.mutable.HashMap +import java.net.InetSocketAddress import java.lang.Object import kafka.utils.Logging import kafka.api.TopicMetadata @@ -27,10 +28,10 @@ import kafka.common.UnavailableProducerException object ProducerPool{ - def createSyncProducer(configOpt: Option[ProducerConfig], broker: Broker): SyncProducer = { + def createSyncProducer(configOpt: Option[ProducerConfig], host: String, port: Int): SyncProducer = { val props = new Properties() - props.put("host", broker.host) - props.put("port", broker.port.toString) + props.put("host", host) + props.put("port", port.toString) if(configOpt.isDefined) props.putAll(configOpt.get.props.props) new SyncProducer(new SyncProducerConfig(props)) @@ -53,9 +54,10 @@ class ProducerPool(val config: ProducerConfig) extends Logging { newBrokers.foreach(b => { if(syncProducers.contains(b.id)){ syncProducers(b.id).close() - syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b)) - } else - syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b)) + syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b.host, b.port)) + } else { + syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b.host, b.port)) + } }) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index a14e0a2..9524f67 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -96,7 +96,7 @@ class KafkaApis(val requestChannel: RequestChannel, val topicMeatadata = apiRequest.topics.map { topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } - val errorResponse = TopicMetadataResponse(apiRequest.versionId, topicMeatadata) + val errorResponse = TopicMetadataResponse(apiRequest.versionId, topicMeatadata, apiRequest.correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) error("error when handling request %s".format(apiRequest), e) case RequestKeys.LeaderAndIsrKey => @@ -454,7 +454,7 @@ class KafkaApis(val requestChannel: RequestChannel, } }) topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString)) - val response = new TopicMetadataResponse(metadataRequest.versionId, topicsMetadata.toSeq) + val response = new TopicMetadataResponse(metadataRequest.versionId, topicsMetadata.toSeq, metadataRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala index 67a0be8..e1c11f2 100644 --- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala +++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala @@ -43,8 +43,7 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging { private def registerBrokerInZk() { info("Registering broker " + brokerIdPath) val hostName = config.hostName - val creatorId = hostName + "-" + System.currentTimeMillis - ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port) + ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port) } /** diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 4333401..8c2bf39 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -180,9 +180,9 @@ object ZkUtils extends Logging { replicas.contains(brokerId.toString) } - def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) { + def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id - val broker = new Broker(id, creator, host, port) + val broker = new Broker(id, host, port) try { createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString) } catch { diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 531f32e..9a442c6 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -75,10 +75,11 @@ object SerializationTestUtils{ TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100) ) - private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) - private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) - private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) - private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) + private val brokers = List(new Broker(0, "localhost", 1011), new Broker(1, "localhost", 1012), new Broker(2, "localhost", 1013)) + private val partitionMetaData0 = new PartitionMetadata(0, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 0) + private val partitionMetaData1 = new PartitionMetadata(1, Some(brokers.head), replicas = brokers, isr = brokers.tail, errorCode = 1) + private val partitionMetaData2 = new PartitionMetadata(2, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 2) + private val partitionMetaData3 = new PartitionMetadata(3, Some(brokers.head), replicas = brokers, isr = brokers.tail.tail, errorCode = 3) private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3) private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq) private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq) @@ -137,11 +138,11 @@ object SerializationTestUtils{ } def createTestTopicMetadataRequest: TopicMetadataRequest = { - new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2)) + new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2), 1) } def createTestTopicMetadataResponse: TopicMetadataResponse = { - new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2)) + new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2), 1) } } diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 962d5f9..22e4f7d 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -47,7 +47,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { val group = "group1" val consumer0 = "consumer0" val consumedOffset = 5 - val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port))) + val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port))) val queue = new LinkedBlockingQueue[FetchedDataChunk] val topicInfos = configs.map(c => new PartitionTopicInfo(topic, c.brokerId, diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index df754cc..cc27a94 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -41,7 +41,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { yield new KafkaConfig(props) val messages = new mutable.HashMap[Int, Seq[Array[Byte]]] val topic = "topic" - val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port))) + val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port))) val shutdown = ZookeeperConsumerConnector.shutdownCommand val queue = new LinkedBlockingQueue[FetchedDataChunk] val topicInfos = configs.map(c => new PartitionTopicInfo(topic, diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 19f4c3b..d771d1e 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -165,8 +165,8 @@ class AsyncProducerTest extends JUnit3Suite { val props = new Properties() props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - val broker1 = new Broker(0, "localhost", "localhost", 9092) - val broker2 = new Broker(1, "localhost", "localhost", 9093) + val broker1 = new Broker(0, "localhost", 9092) + val broker2 = new Broker(1, "localhost", 9093) broker1 // form expected partitions metadata val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2)) @@ -491,7 +491,7 @@ class AsyncProducerTest extends JUnit3Suite { } private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = { - val broker1 = new Broker(brokerId, brokerHost, brokerHost, brokerPort) + val broker1 = new Broker(brokerId, brokerHost, brokerPort) new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1)))) } diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 19b90a8..fcdd26e 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -123,7 +123,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // start another controller val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort())) - val brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, "localhost", s.config.port)) + val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port)) val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig) controllerChannelManager.startup() val staleControllerEpoch = 0 diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 3d4f3f2..cf48ccb 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -333,13 +333,13 @@ object TestUtils extends Logging { } def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = { - val brokers = ids.map(id => new Broker(id, "localhost" + System.currentTimeMillis(), "localhost", 6667)) - brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.creatorId, b.port)) + val brokers = ids.map(id => new Broker(id, "localhost", 6667)) + brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port)) brokers } def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = { - val brokers = ids.map(id => new Broker(id, "localhost" + System.currentTimeMillis(), "localhost", 6667)) + val brokers = ids.map(id => new Broker(id, "localhost", 6667)) brokers.foreach(b => ZkUtils.deletePath(zkClient, ZkUtils.BrokerIdsPath + "/" + b)) brokers }