diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index f57de6e..7459f4a 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -87,6 +87,7 @@ object LeaderAndIsrRequest { def readFrom(buffer: ByteBuffer): LeaderAndIsrRequest = { val versionId = buffer.getShort + val correlationId = buffer.getInt val clientId = readShortString(buffer) val ackTimeoutMs = buffer.getInt val controllerEpoch = buffer.getInt @@ -106,11 +107,12 @@ object LeaderAndIsrRequest { for (i <- 0 until leadersCount) leaders += Broker.readFrom(buffer) - new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch) + new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch) } } case class LeaderAndIsrRequest (versionId: Short, + correlationId: Int, clientId: String, ackTimeoutMs: Int, partitionStateInfos: Map[(String, Int), PartitionStateInfo], @@ -119,12 +121,13 @@ case class LeaderAndIsrRequest (versionId: Short, extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) { def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerEpoch: Int) = { - this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, + this(LeaderAndIsrRequest.CurrentVersion, 0, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, partitionStateInfos, liveBrokers, controllerEpoch) } def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) + buffer.putInt(correlationId) writeShortString(buffer, clientId) buffer.putInt(ackTimeoutMs) buffer.putInt(controllerEpoch) @@ -141,6 +144,7 @@ case class LeaderAndIsrRequest (versionId: Short, def sizeInBytes(): Int = { var size = 2 /* version id */ + + 4 /* correlation id */ + (2 + clientId.length) /* client id */ + 4 /* ack timeout */ + 4 /* controller epoch */ + diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala index f2e86be..c8f1630 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala @@ -27,6 +27,7 @@ import collection.Map object LeaderAndIsrResponse { def readFrom(buffer: ByteBuffer): LeaderAndIsrResponse = { val versionId = buffer.getShort + val correlationId = buffer.getInt val errorCode = buffer.getShort val numEntries = buffer.getInt val responseMap = new HashMap[(String, Int), Short]() @@ -36,18 +37,20 @@ object LeaderAndIsrResponse { val partitionErrorCode = buffer.getShort responseMap.put((topic, partition), partitionErrorCode) } - new LeaderAndIsrResponse(versionId, responseMap, errorCode) + new LeaderAndIsrResponse(versionId, correlationId, responseMap, errorCode) } } case class LeaderAndIsrResponse(versionId: Short, + correlationId: Int, responseMap: Map[(String, Int), Short], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse { def sizeInBytes(): Int ={ var size = 2 /* version id */ + + 4 /* correlation id */ + 2 /* error code */ + 4 /* number of responses */ for ((key, value) <- responseMap) { @@ -61,6 +64,7 @@ case class LeaderAndIsrResponse(versionId: Short, def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) + buffer.putInt(correlationId) buffer.putShort(errorCode) buffer.putInt(responseMap.size) for ((key:(String, Int), value) <- responseMap){ diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index ee3dff5..5239538 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -33,6 +33,7 @@ object OffsetRequest { def readFrom(buffer: ByteBuffer): OffsetRequest = { val versionId = buffer.getShort + val correlationId = buffer.getInt val clientId = readShortString(buffer) val replicaId = buffer.getInt val topicCount = buffer.getInt @@ -54,16 +55,18 @@ case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int) case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], versionId: Short = OffsetRequest.CurrentVersion, + correlationId: Int = 0, clientId: String = OffsetRequest.DefaultClientId, replicaId: Int = Request.OrdinaryConsumerId) extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) { - def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, replicaId) + def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], correlationId: Int, replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, correlationId, OffsetRequest.DefaultClientId, replicaId) lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) + buffer.putInt(correlationId) writeShortString(buffer, clientId) buffer.putInt(replicaId) @@ -83,6 +86,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ def sizeInBytes = 2 + /* versionId */ + 4 + /* correlationId */ shortStringLength(clientId) + 4 + /* replicaId */ 4 + /* topic count */ diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala index 10a7715..7818b66 100644 --- a/core/src/main/scala/kafka/api/OffsetResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetResponse.scala @@ -26,6 +26,7 @@ object OffsetResponse { def readFrom(buffer: ByteBuffer): OffsetResponse = { val versionId = buffer.getShort + val correlationId = buffer.getInt val numTopics = buffer.getInt val pairs = (1 to numTopics).flatMap(_ => { val topic = readShortString(buffer) @@ -38,7 +39,7 @@ object OffsetResponse { (TopicAndPartition(topic, partition), PartitionOffsetsResponse(error, offsets)) }) }) - OffsetResponse(versionId, Map(pairs:_*)) + OffsetResponse(versionId, correlationId, Map(pairs:_*)) } } @@ -48,6 +49,7 @@ case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) case class OffsetResponse(versionId: Short, + correlationId: Int, partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse]) extends RequestOrResponse { @@ -57,6 +59,7 @@ case class OffsetResponse(versionId: Short, val sizeInBytes = { 2 + /* versionId */ + 4 + /* correlation id */ 4 + /* topic count */ offsetsGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { val (topic, errorAndOffsetsMap) = currTopic @@ -75,6 +78,7 @@ case class OffsetResponse(versionId: Short, def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) + buffer.putInt(correlationId) buffer.putInt(offsetsGroupedByTopic.size) // topic count offsetsGroupedByTopic.foreach { case((topic, errorAndOffsetsMap)) => diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala index 9088fa9..6583d64 100644 --- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -31,6 +31,7 @@ object StopReplicaRequest extends Logging { def readFrom(buffer: ByteBuffer): StopReplicaRequest = { val versionId = buffer.getShort + val correlationId = buffer.getInt val clientId = readShortString(buffer) val ackTimeoutMs = buffer.getInt val controllerEpoch = buffer.getInt @@ -45,11 +46,12 @@ object StopReplicaRequest extends Logging { (1 to topicPartitionPairCount) foreach { _ => topicPartitionPairSet.add(readShortString(buffer), buffer.getInt) } - StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet, controllerEpoch) + StopReplicaRequest(versionId, correlationId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet, controllerEpoch) } } case class StopReplicaRequest(versionId: Short, + correlationId: Int, clientId: String, ackTimeoutMs: Int, deletePartitions: Boolean, @@ -58,12 +60,13 @@ case class StopReplicaRequest(versionId: Short, extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) { def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int) = { - this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, + this(StopReplicaRequest.CurrentVersion, 0, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, deletePartitions, partitions, controllerEpoch) } def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) + buffer.putInt(correlationId) writeShortString(buffer, clientId) buffer.putInt(ackTimeoutMs) buffer.putInt(controllerEpoch) @@ -78,6 +81,7 @@ case class StopReplicaRequest(versionId: Short, def sizeInBytes(): Int = { var size = 2 + /* versionId */ + 4 + /* correlation id */ ApiUtils.shortStringLength(clientId) + 4 + /* ackTimeoutMs */ 4 + /* controller epoch */ diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala index e0d3de6..6062a0c 100644 --- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala +++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala @@ -27,6 +27,7 @@ import kafka.api.ApiUtils._ object StopReplicaResponse { def readFrom(buffer: ByteBuffer): StopReplicaResponse = { val versionId = buffer.getShort + val correlationId = buffer.getInt val errorCode = buffer.getShort val numEntries = buffer.getInt @@ -37,17 +38,19 @@ object StopReplicaResponse { val partitionErrorCode = buffer.getShort() responseMap.put((topic, partition), partitionErrorCode) } - new StopReplicaResponse(versionId, responseMap.toMap, errorCode) + new StopReplicaResponse(versionId, correlationId, responseMap.toMap, errorCode) } } case class StopReplicaResponse(val versionId: Short, + val correlationId: Int, val responseMap: Map[(String, Int), Short], val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{ def sizeInBytes(): Int ={ var size = 2 /* version id */ + + 4 /* correlation id */ + 2 /* error code */ + 4 /* number of responses */ for ((key, value) <- responseMap) { @@ -61,6 +64,7 @@ case class StopReplicaResponse(val versionId: Short, def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) + buffer.putInt(correlationId) buffer.putShort(errorCode) buffer.putInt(responseMap.size) for ((key:(String, Int), value) <- responseMap){ diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index e2d03e8..409de76 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -21,57 +21,29 @@ import kafka.cluster.Broker import java.nio.ByteBuffer import kafka.api.ApiUtils._ import kafka.utils.Logging -import collection.mutable.ListBuffer -import kafka.common.{KafkaException, ErrorMapping} - -/** - * topic (2 bytes + topic.length) - * number of partitions (4 bytes) - * - * partition id (4 bytes) - * - * does leader exist (1 byte) - * leader info (4 + creator.length + host.length + 4 (port) + 4 (id)) - * number of replicas (2 bytes) - * replica info (4 + creator.length + host.length + 4 (port) + 4 (id)) - * number of in sync replicas (2 bytes) - * replica info (4 + creator.length + host.length + 4 (port) + 4 (id)) - * - * does log metadata exist (1 byte) - * number of log segments (4 bytes) - * total size of log in bytes (8 bytes) - * - * number of log segments (4 bytes) - * beginning offset (8 bytes) - * last modified timestamp (8 bytes) - * size of log segment (8 bytes) - * - */ - -sealed trait LeaderRequest { def requestId: Byte } -case object LeaderExists extends LeaderRequest { val requestId: Byte = 1 } -case object LeaderDoesNotExist extends LeaderRequest { val requestId: Byte = 0 } +import collection.mutable.ArrayBuffer +import kafka.common._ object TopicMetadata { + + val NoLeaderNodeId = -1 - def readFrom(buffer: ByteBuffer): TopicMetadata = { + def readFrom(buffer: ByteBuffer, brokers: Map[Int, Broker]): TopicMetadata = { val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) val topic = readShortString(buffer) val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue)) - val partitionsMetadata = new ListBuffer[PartitionMetadata]() + val partitionsMetadata = new ArrayBuffer[PartitionMetadata]() for(i <- 0 until numPartitions) - partitionsMetadata += PartitionMetadata.readFrom(buffer) + partitionsMetadata += PartitionMetadata.readFrom(buffer, brokers) new TopicMetadata(topic, partitionsMetadata, errorCode) } } case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) extends Logging { def sizeInBytes: Int = { - var size: Int = 2 /* error code */ - size += shortStringLength(topic) - size += partitionsMetadata.foldLeft(4 /* number of partitions */)(_ + _.sizeInBytes) - debug("Size of topic metadata = " + size) - size + 2 /* error code */ + + shortStringLength(topic) + + 4 + partitionsMetadata.map(_.sizeInBytes).sum /* size and partition data array */ } def writeTo(buffer: ByteBuffer) { @@ -87,40 +59,24 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat object PartitionMetadata { - def readFrom(buffer: ByteBuffer): PartitionMetadata = { + def readFrom(buffer: ByteBuffer, brokers: Map[Int, Broker]): PartitionMetadata = { val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */ - val doesLeaderExist = getLeaderRequest(buffer.get) - val leader = doesLeaderExist match { - case LeaderExists => /* leader exists */ - Some(Broker.readFrom(buffer)) - case LeaderDoesNotExist => None - } + val leaderId = buffer.getInt + val leader = brokers.get(leaderId) /* list of all replicas */ - val numReplicas = readShortInRange(buffer, "number of all replicas", (0, Short.MaxValue)) - val replicas = new Array[Broker](numReplicas) - for(i <- 0 until numReplicas) { - replicas(i) = Broker.readFrom(buffer) - } + val numReplicas = readIntInRange(buffer, "number of all replicas", (0, Int.MaxValue)) + val replicaIds = (0 until numReplicas).map(_ => buffer.getInt) + val replicas = replicaIds.map(brokers) /* list of in-sync replicas */ - val numIsr = readShortInRange(buffer, "number of in-sync replicas", (0, Short.MaxValue)) - val isr = new Array[Broker](numIsr) - for(i <- 0 until numIsr) { - isr(i) = Broker.readFrom(buffer) - } + val numIsr = readIntInRange(buffer, "number of in-sync replicas", (0, Int.MaxValue)) + val isrIds = (0 until numIsr).map(_ => buffer.getInt) + val isr = isrIds.map(brokers) new PartitionMetadata(partitionId, leader, replicas, isr, errorCode) } - - private def getLeaderRequest(requestId: Byte): LeaderRequest = { - requestId match { - case LeaderExists.requestId => LeaderExists - case LeaderDoesNotExist.requestId => LeaderDoesNotExist - case _ => throw new KafkaException("Unknown leader request id " + requestId) - } - } } case class PartitionMetadata(partitionId: Int, @@ -129,42 +85,28 @@ case class PartitionMetadata(partitionId: Int, isr: Seq[Broker] = Seq.empty, errorCode: Short = ErrorMapping.NoError) extends Logging { def sizeInBytes: Int = { - var size: Int = 2 /* error code */ + 4 /* partition id */ + 1 /* if leader exists*/ - - leader match { - case Some(l) => size += l.sizeInBytes - case None => - } - - size += 2 /* number of replicas */ - size += replicas.foldLeft(0)(_ + _.sizeInBytes) - size += 2 /* number of in sync replicas */ - size += isr.foldLeft(0)(_ + _.sizeInBytes) - - debug("Size of partition metadata = " + size) - size + 2 /* error code */ + + 4 /* partition id */ + + 4 /* leader */ + + 4 + 4 * replicas.size /* replica array */ + + 4 + 4 * isr.size /* isr array */ } def writeTo(buffer: ByteBuffer) { buffer.putShort(errorCode) buffer.putInt(partitionId) - /* if leader exists*/ - leader match { - case Some(l) => - buffer.put(LeaderExists.requestId) - /* leader id host_name port */ - l.writeTo(buffer) - case None => buffer.put(LeaderDoesNotExist.requestId) - } + /* leader */ + val leaderId = if(leader.isDefined) leader.get.id else TopicMetadata.NoLeaderNodeId + buffer.putInt(leaderId) /* number of replicas */ - buffer.putShort(replicas.size.toShort) - replicas.foreach(r => r.writeTo(buffer)) + buffer.putInt(replicas.size) + replicas.foreach(r => buffer.putInt(r.id)) /* number of in-sync replicas */ - buffer.putShort(isr.size.toShort) - isr.foreach(r => r.writeTo(buffer)) + buffer.putInt(isr.size) + isr.foreach(r => buffer.putInt(r.id)) } } diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 70c42e3..17d272f 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -33,6 +33,7 @@ object TopicMetadataRequest extends Logging { def readFrom(buffer: ByteBuffer): TopicMetadataRequest = { val versionId = buffer.getShort + val correlationId = buffer.getInt val clientId = readShortString(buffer) val numTopics = readIntInRange(buffer, "number of topics", (0, Int.MaxValue)) val topics = new ListBuffer[String]() @@ -40,26 +41,28 @@ object TopicMetadataRequest extends Logging { topics += readShortString(buffer) val topicsList = topics.toList debug("topic = %s".format(topicsList.head)) - new TopicMetadataRequest(versionId, clientId, topics.toList) + new TopicMetadataRequest(versionId, clientId, topics.toList, correlationId) } } case class TopicMetadataRequest(val versionId: Short, val clientId: String, - val topics: Seq[String]) + val topics: Seq[String], + val correlationId: Int) extends RequestOrResponse(Some(RequestKeys.MetadataKey)){ def this(topics: Seq[String]) = - this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics) + this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, 0) def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) + buffer.putInt(correlationId) // correlation id not set yet writeShortString(buffer, clientId) buffer.putInt(topics.size) topics.foreach(topic => writeShortString(buffer, topic)) } def sizeInBytes(): Int = { - 2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ + 2 + 4 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ } } diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala index 25068d1..0631201 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala @@ -17,30 +17,46 @@ package kafka.api +import kafka.cluster.Broker import java.nio.ByteBuffer object TopicMetadataResponse { def readFrom(buffer: ByteBuffer): TopicMetadataResponse = { val versionId = buffer.getShort + val correlationId = buffer.getInt + val brokerCount = buffer.getInt + val brokers = (0 until brokerCount).map(_ => Broker.readFrom(buffer)) + val brokerMap = brokers.map(b => (b.id, b)).toMap val topicCount = buffer.getInt - val topicsMetadata = new Array[TopicMetadata](topicCount) - for( i <- 0 until topicCount) { - topicsMetadata(i) = TopicMetadata.readFrom(buffer) - } - new TopicMetadataResponse(versionId, topicsMetadata.toSeq) + val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer, brokerMap)) + new TopicMetadataResponse(versionId, topicsMetadata, correlationId) } } case class TopicMetadataResponse(versionId: Short, - topicsMetadata: Seq[TopicMetadata]) extends RequestOrResponse -{ - val sizeInBytes = 2 + topicsMetadata.foldLeft(4)(_ + _.sizeInBytes) + topicsMetadata: Seq[TopicMetadata], + correlationId: Int) extends RequestOrResponse { + val sizeInBytes: Int = { + val brokers = extractBrokers(topicsMetadata).values + 2 + 4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum + } def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) + buffer.putInt(correlationId) + /* brokers */ + val brokers = extractBrokers(topicsMetadata).values + buffer.putInt(brokers.size) + brokers.foreach(_.writeTo(buffer)) /* topic metadata */ buffer.putInt(topicsMetadata.length) topicsMetadata.foreach(_.writeTo(buffer)) } + + def extractBrokers(topicMetadatas: Seq[TopicMetadata]): Map[Int, Broker] = { + val parts = topicsMetadata.flatMap(_.partitionsMetadata) + val brokers = parts.flatMap(_.replicas) ++ parts.map(_.leader).collect{case Some(l) => l} + brokers.map(b => (b.id, b)).toMap + } } diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index cc4df5d..c61833b 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -75,8 +75,7 @@ object ClientUtils extends Logging{ val brokerInfos = brokerStr.split(":") val hostName = brokerInfos(0) val port = brokerInfos(1).toInt - val creatorId = hostName + "-" + System.currentTimeMillis() - new Broker(brokerId, creatorId, hostName, port) + new Broker(brokerId, hostName, port) }) } diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 6e072bf..ffedecd 100644 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -31,38 +31,32 @@ private[kafka] object Broker { if(brokerInfoString == null) throw new BrokerNotAvailableException("Broker id %s does not exist".format(id)) val brokerInfo = brokerInfoString.split(":") - new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt) + new Broker(id, brokerInfo(0), brokerInfo(1).toInt) } def readFrom(buffer: ByteBuffer): Broker = { val id = buffer.getInt - val creatorId = readShortString(buffer) val host = readShortString(buffer) val port = buffer.getInt - new Broker(id, creatorId, host, port) + new Broker(id, host, port) } } -private[kafka] case class Broker(val id: Int, val creatorId: String, val host: String, val port: Int) { +private[kafka] case class Broker(val id: Int, val host: String, val port: Int) { - override def toString(): String = new String("id:" + id + ",creatorId:" + creatorId + ",host:" + host + ",port:" + port) + override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port) - def getZkString(): String = new String(creatorId + ":" + host + ":" + port) + def getZkString(): String = host + ":" + port - def getConnectionString(): String = new String(host + ":" + port) + def getConnectionString(): String = host + ":" + port def writeTo(buffer: ByteBuffer) { buffer.putInt(id) - writeShortString(buffer, creatorId) writeShortString(buffer, host) buffer.putInt(port) } - def sizeInBytes: Int = { - val size = shortStringLength(creatorId) + shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/ - debug("Size of broker info = " + size) - size - } + def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/ override def equals(obj: Any): Boolean = { obj match { diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 7ecd11f..5e1e6ab 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -30,8 +30,12 @@ import kafka.cluster.Broker object SimpleConsumer extends Logging { - def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long, - clientId: String, isFromOrdinaryConsumer: Boolean): Long = { + def earliestOrLatestOffset(broker: Broker, + topic: String, + partitionId: Int, + earliestOrLatest: Long, + clientId: String, + isFromOrdinaryConsumer: Boolean): Long = { var simpleConsumer: SimpleConsumer = null var producedOffset: Long = -1L try { @@ -42,7 +46,7 @@ object SimpleConsumer extends Logging { new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))) else new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), - Request.DebuggingConsumerId) + 0, Request.DebuggingConsumerId) producedOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head } catch { case e => @@ -55,8 +59,13 @@ object SimpleConsumer extends Logging { producedOffset } - def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId: Int, - earliestOrLatest: Long, clientId: String, isFromOrdinaryConsumer: Boolean = true): Long = { + def earliestOrLatestOffset(zkClient: ZkClient, + topic: String, + brokerId: Int, + partitionId: Int, + earliestOrLatest: Long, + clientId: String, + isFromOrdinaryConsumer: Boolean = true): Long = { val cluster = getCluster(zkClient) val broker = cluster.getBroker(brokerId) match { case Some(b) => b diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index ad98b75..dbf04fd 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -20,14 +20,15 @@ import kafka.api._ import java.nio.ByteBuffer import scala.collection.JavaConversions -class TopicMetadataRequest(val versionId: Short, +class TopicMetadataRequest(val correlationId: Int, + val versionId: Short, val clientId: String, val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) { val underlying: kafka.api.TopicMetadataRequest = - new kafka.api.TopicMetadataRequest(versionId, clientId, JavaConversions.asBuffer(topics)) + new kafka.api.TopicMetadataRequest(versionId, clientId, JavaConversions.asBuffer(topics), correlationId) def this(topics: java.util.List[String]) = - this(kafka.api.TopicMetadataRequest.CurrentVersion, kafka.api.TopicMetadataRequest.DefaultClientId, topics) + this(0, kafka.api.TopicMetadataRequest.CurrentVersion, kafka.api.TopicMetadataRequest.DefaultClientId, topics) def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer) diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index aedab42..38c0a9a 100644 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -38,12 +38,15 @@ object Message { val KeySizeOffset = AttributesOffset + AttributesLength val KeySizeLength = 4 val KeyOffset = KeySizeOffset + KeySizeLength - val MessageOverhead = KeyOffset + val ValueSizeLength = 4 + + /** The amount of overhead bytes in a message */ + val MessageOverhead = KeyOffset + ValueSizeLength /** * The minimum valid size for the message header */ - val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength + val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength /** * The current "magic" value @@ -97,22 +100,24 @@ class Message(val buffer: ByteBuffer) { Message.AttributesLength + Message.KeySizeLength + (if(key == null) 0 else key.length) + + Message.ValueSizeLength + (if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset))) // skip crc, we will fill that in at the end - buffer.put(MagicOffset, CurrentMagicValue) - var attributes:Byte = 0 + buffer.position(MagicOffset) + buffer.put(CurrentMagicValue) + var attributes: Byte = 0 if (codec.codec > 0) attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte - buffer.put(AttributesOffset, attributes) + buffer.put(attributes) if(key == null) { - buffer.putInt(KeySizeOffset, -1) - buffer.position(KeyOffset) + buffer.putInt(-1) } else { - buffer.putInt(KeySizeOffset, key.length) - buffer.position(KeyOffset) + buffer.putInt(key.length) buffer.put(key, 0, key.length) } - buffer.put(bytes, payloadOffset, if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset) + val size = if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset + buffer.putInt(size) + buffer.put(bytes, payloadOffset, size) buffer.rewind() // now compute the checksum and fill it in @@ -171,9 +176,14 @@ class Message(val buffer: ByteBuffer) { def hasKey: Boolean = keySize >= 0 /** + * The position where the payload size is stored + */ + private def payloadSizeOffset = Message.KeyOffset + max(0, keySize) + + /** * The length of the message value in bytes */ - def payloadSize: Int = size - KeyOffset - max(0, keySize) + def payloadSize: Int = buffer.getInt(payloadSizeOffset) /** * The magic version of this message @@ -194,29 +204,27 @@ class Message(val buffer: ByteBuffer) { /** * A ByteBuffer containing the content of the message */ - def payload: ByteBuffer = { - var payload = buffer.duplicate - payload.position(KeyOffset + max(keySize, 0)) - payload = payload.slice() - payload.limit(payloadSize) - payload.rewind() - payload - } + def payload: ByteBuffer = sliceDelimited(payloadSizeOffset) /** * A ByteBuffer containing the message key */ - def key: ByteBuffer = { - val s = keySize - if(s < 0) { + def key: ByteBuffer = sliceDelimited(KeySizeOffset) + + /** + * Read a size-delimited byte buffer starting at the given offset + */ + private def sliceDelimited(start: Int): ByteBuffer = { + val size = buffer.getInt(start) + if(size < 0) { null } else { - var key = buffer.duplicate - key.position(KeyOffset) - key = key.slice() - key.limit(s) - key.rewind() - key + var b = buffer.duplicate + b.position(start + 4) + b = b.slice() + b.limit(size) + b.rewind + b } } diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala index 2c9f2d1..30b1dc3 100644 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -33,7 +33,7 @@ class ProducerConfig private (val props: VerifiableProperties) /** This is for bootstrapping and the producer will only use it for getting metadata * (topics, partitions and replicas). The socket connections for sending the actual data * will be established based on the broker information returned in the metadata. The - * format is host1:por1,host2:port2, and the list can be a subset of brokers or + * format is host1:port1,host2:port2, and the list can be a subset of brokers or * a VIP pointing to a subset of brokers. */ val brokerList = props.getString("broker.list") diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala index f94415a..5ebd29a 100644 --- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala @@ -41,9 +41,6 @@ trait SyncProducerConfigShared { val maxMessageSize = props.getInt("max.message.size", 1000000) /* the client application sending the producer requests */ - val correlationId = props.getInt("producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId) - - /* the client application sending the producer requests */ val clientId = props.getString("clientid", SyncProducerConfig.DefaultClientId) /* @@ -61,7 +58,6 @@ trait SyncProducerConfigShared { } object SyncProducerConfig { - val DefaultCorrelationId = -1 val DefaultClientId = "" val DefaultRequiredAcks : Short = 0 val DefaultAckTimeoutMs = 1500 diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 9f3e2ea..7d0f609 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -39,7 +39,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig, extends EventHandler[K,V] with Logging { val isSync = ("sync" == config.producerType) - val counter = new AtomicInteger(0) + val partitionCounter = new AtomicInteger(0) + val correlationCounter = new AtomicInteger(0) val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos) private val lock = new Object() @@ -191,7 +192,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, "\n Valid values are > 0") val partition = if(key == null) - Utils.abs(counter.getAndIncrement()) % numPartitions + Utils.abs(partitionCounter.getAndIncrement()) % numPartitions else partitioner.partition(key, numPartitions) if(partition < 0 || partition >= numPartitions) @@ -212,7 +213,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic)) messagesPerTopic.keys.toSeq } else if(messagesPerTopic.size > 0) { - val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, + val producerRequest = new ProducerRequest(correlationCounter.getAndIncrement(), config.clientId, config.requiredAcks, config.requestTimeoutMs, messagesPerTopic) try { val syncProducer = producerPool.getProducer(brokerId) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b3dc79d..6c01025 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -88,7 +88,7 @@ class KafkaApis(val requestChannel: RequestChannel, case (topicAndPartition, partitionOffsetRequest) => (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null)) } - val errorResponse = OffsetResponse(apiRequest.versionId, partitionOffsetResponseMap) + val errorResponse = OffsetResponse(apiRequest.versionId, apiRequest.correlationId, partitionOffsetResponseMap) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) error("error when handling request %s".format(apiRequest), e) case RequestKeys.MetadataKey => @@ -96,7 +96,7 @@ class KafkaApis(val requestChannel: RequestChannel, val topicMeatadata = apiRequest.topics.map { topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } - val errorResponse = TopicMetadataResponse(apiRequest.versionId, topicMeatadata) + val errorResponse = TopicMetadataResponse(apiRequest.versionId, topicMeatadata, apiRequest.correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) error("error when handling request %s".format(apiRequest), e) case RequestKeys.LeaderAndIsrKey => @@ -104,7 +104,7 @@ class KafkaApis(val requestChannel: RequestChannel, val responseMap = apiRequest.partitionStateInfos.map { case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } - val errorResponse = LeaderAndIsrResponse(apiRequest.versionId, responseMap) + val errorResponse = LeaderAndIsrResponse(apiRequest.versionId, apiRequest.correlationId, responseMap) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) error("error when handling request %s".format(apiRequest), e) case RequestKeys.StopReplicaKey => @@ -113,7 +113,7 @@ class KafkaApis(val requestChannel: RequestChannel, case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) }.toMap error("error when handling request %s".format(apiRequest), e) - val errorResponse = StopReplicaResponse(apiRequest.versionId, responseMap) + val errorResponse = StopReplicaResponse(apiRequest.versionId, apiRequest.correlationId, responseMap) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } } finally @@ -127,7 +127,7 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Handling leader and ISR request " + leaderAndIsrRequest) try { val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest) - val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, response, error) + val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, leaderAndIsrRequest.correlationId, response, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse))) } catch { case e: KafkaStorageException => @@ -144,7 +144,7 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Handling stop replica request " + stopReplicaRequest) val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) - val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, response.toMap, error) + val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, stopReplicaRequest.correlationId, response.toMap, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads() @@ -409,7 +409,7 @@ class KafkaApis(val requestChannel: RequestChannel, (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) ) } }) - val response = OffsetResponse(OffsetRequest.CurrentVersion, responseMap) + val response = OffsetResponse(OffsetRequest.CurrentVersion, offsetRequest.correlationId, responseMap) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } @@ -458,7 +458,7 @@ class KafkaApis(val requestChannel: RequestChannel, } }) topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString)) - val response = new TopicMetadataResponse(metadataRequest.versionId, topicsMetadata.toSeq) + val response = new TopicMetadataResponse(metadataRequest.versionId, topicsMetadata.toSeq, metadataRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala index 67a0be8..e1c11f2 100644 --- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala +++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala @@ -43,8 +43,7 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging { private def registerBrokerInZk() { info("Registering broker " + brokerIdPath) val hostName = config.hostName - val creatorId = hostName + "-" + System.currentTimeMillis - ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port) + ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port) } /** diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 8e40f2b..358c4fd 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -180,9 +180,9 @@ object ZkUtils extends Logging { replicas.contains(brokerId.toString) } - def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) { + def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id - val broker = new Broker(id, creator, host, port) + val broker = new Broker(id, host, port) try { createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZkString) } catch { diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 531f32e..391e724 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -75,10 +75,11 @@ object SerializationTestUtils{ TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100) ) - private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) - private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) - private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) - private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) + private val brokers = List(new Broker(0, "localhost", 1011), new Broker(1, "localhost", 1012), new Broker(2, "localhost", 1013)) + private val partitionMetaData0 = new PartitionMetadata(0, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 0) + private val partitionMetaData1 = new PartitionMetadata(1, Some(brokers.head), replicas = brokers, isr = brokers.tail, errorCode = 1) + private val partitionMetaData2 = new PartitionMetadata(2, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 2) + private val partitionMetaData3 = new PartitionMetadata(3, Some(brokers.head), replicas = brokers, isr = brokers.tail.tail, errorCode = 3) private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3) private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq) private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq) @@ -94,7 +95,7 @@ object SerializationTestUtils{ def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = { val responseMap = Map(((topic1, 0), ErrorMapping.NoError), ((topic2, 0), ErrorMapping.NoError)) - new LeaderAndIsrResponse(1, responseMap) + new LeaderAndIsrResponse(1, 1, responseMap) } def createTestStopReplicaRequest() : StopReplicaRequest = { @@ -104,7 +105,7 @@ object SerializationTestUtils{ def createTestStopReplicaResponse() : StopReplicaResponse = { val responseMap = Map(((topic1, 0), ErrorMapping.NoError), ((topic2, 0), ErrorMapping.NoError)) - new StopReplicaResponse(1, responseMap.toMap) + new StopReplicaResponse(1, 0, responseMap.toMap) } def createTestProducerRequest: ProducerRequest = { @@ -131,17 +132,17 @@ object SerializationTestUtils{ ) def createTestOffsetResponse: OffsetResponse = { - new OffsetResponse(OffsetRequest.CurrentVersion, collection.immutable.Map( + new OffsetResponse(OffsetRequest.CurrentVersion, 0, collection.immutable.Map( TopicAndPartition(topic1, 1) -> PartitionOffsetsResponse(ErrorMapping.NoError, Seq(1000l, 2000l, 3000l, 4000l))) ) } def createTestTopicMetadataRequest: TopicMetadataRequest = { - new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2)) + new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2), 1) } def createTestTopicMetadataResponse: TopicMetadataResponse = { - new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2)) + new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2), 1) } } diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index bb39e09..246b1ec 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -47,7 +47,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { val group = "group1" val consumer0 = "consumer0" val consumedOffset = 5 - val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port))) + val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port))) val queue = new LinkedBlockingQueue[FetchedDataChunk] val topicInfos = configs.map(c => new PartitionTopicInfo(topic, c.brokerId, diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index dec0453..021f419 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -40,7 +40,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { yield new KafkaConfig(props) val messages = new mutable.HashMap[Int, Seq[Array[Byte]]] val topic = "topic" - val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port))) + val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port))) val shutdown = ZookeeperConsumerConnector.shutdownCommand val queue = new LinkedBlockingQueue[FetchedDataChunk] val topicInfos = configs.map(c => new PartitionTopicInfo(topic, diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala index b6bab2d..c6ea3b6 100644 --- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala @@ -92,7 +92,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { log.flush() val offsets = log.getOffsetsBefore(OffsetRequest.LatestTime, 10) - assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets) + assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets) waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) val topicAndPartition = TopicAndPartition(topic, part) @@ -101,7 +101,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { replicaId = 0) val consumerOffsets = simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets - assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets) + assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets) // try to fetch using latest offset val fetchResponse = simpleConsumer.fetch( @@ -157,14 +157,14 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val now = time.milliseconds val offsets = log.getOffsetsBefore(now, 10) - assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets) + assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets) waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) val topicAndPartition = TopicAndPartition(topic, part) val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0) val consumerOffsets = simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets - assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets) + assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets) } @Test diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 9074ca8..3b5ec7f 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -74,7 +74,7 @@ class SocketServerTest extends JUnitSuite { @Test def simpleRequest() { val socket = connect() - val correlationId = SyncProducerConfig.DefaultCorrelationId + val correlationId = -1 val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ack = SyncProducerConfig.DefaultRequiredAcks diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index d67abe9..534df19 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -168,8 +168,8 @@ class AsyncProducerTest extends JUnit3Suite { val props = new Properties() props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - val broker1 = new Broker(0, "localhost", "localhost", 9092) - val broker2 = new Broker(1, "localhost", "localhost", 9093) + val broker1 = new Broker(0, "localhost", 9092) + val broker2 = new Broker(1, "localhost", 9093) broker1 // form expected partitions metadata val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2)) @@ -427,17 +427,18 @@ class AsyncProducerTest extends JUnit3Suite { // produce request for topic1 and partitions 0 and 1. Let the first request fail // entirely. The second request will succeed for partition 1 but fail for partition 0. // On the third try for partition 0, let it succeed. - val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), 0) + val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 0) + val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 1) val response1 = ProducerResponse(ProducerRequest.CurrentVersion, 0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)), (TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) - val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs)) + val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), correlationId = 2) val response2 = ProducerResponse(ProducerRequest.CurrentVersion, 0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException - EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1) - EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response2) + EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response1) + EasyMock.expect(mockSyncProducer.send(request3)).andReturn(response2) EasyMock.replay(mockSyncProducer) val producerPool = EasyMock.createMock(classOf[ProducerPool]) @@ -510,7 +511,7 @@ class AsyncProducerTest extends JUnit3Suite { } private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = { - val broker1 = new Broker(brokerId, brokerHost, brokerHost, brokerPort) + val broker1 = new Broker(brokerId, brokerHost, brokerPort) new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1)))) } diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 744554c..b289dda 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -81,7 +81,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { props.put("connect.timeout.ms", "300") props.put("reconnect.interval", "500") props.put("max.message.size", "100") - val correlationId = SyncProducerConfig.DefaultCorrelationId + val correlationId = 0 val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ack = SyncProducerConfig.DefaultRequiredAcks @@ -98,9 +98,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val props = new Properties() props.put("host", "localhost") props.put("port", server.socketServer.port.toString) - props.put("buffer.size", "102400") - props.put("connect.timeout.ms", "300") - props.put("reconnect.interval", "500") + props.put("max.message.size", 50000.toString) val producer = new SyncProducer(new SyncProducerConfig(props)) CreateTopicCommand.createTopic(zkClient, "test", 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500) diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 19b90a8..fcdd26e 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -123,7 +123,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // start another controller val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort())) - val brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, "localhost", s.config.port)) + val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port)) val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig) controllerChannelManager.startup() val staleControllerEpoch = 0 diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 3d4f3f2..a145f2a 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -24,6 +24,7 @@ import java.nio.channels._ import java.util.Random import java.util.Properties import junit.framework.Assert._ +import kafka.api._ import kafka.server._ import kafka.producer._ import kafka.message._ @@ -333,13 +334,13 @@ object TestUtils extends Logging { } def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = { - val brokers = ids.map(id => new Broker(id, "localhost" + System.currentTimeMillis(), "localhost", 6667)) - brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.creatorId, b.port)) + val brokers = ids.map(id => new Broker(id, "localhost", 6667)) + brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port)) brokers } def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = { - val brokers = ids.map(id => new Broker(id, "localhost" + System.currentTimeMillis(), "localhost", 6667)) + val brokers = ids.map(id => new Broker(id, "localhost", 6667)) brokers.foreach(b => ZkUtils.deletePath(zkClient, ZkUtils.BrokerIdsPath + "/" + b)) brokers } @@ -354,22 +355,27 @@ object TestUtils extends Logging { /** * Create a wired format request based on simple basic information */ - def produceRequest(topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { - produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message) - } - - def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { - produceRequestWithAcks(List(topic), List(partition), message, SyncProducerConfig.DefaultRequiredAcks) - } - - def produceRequestWithAcks(topics: Seq[String], partitions: Seq[Int], message: ByteBufferMessageSet, acks: Int): kafka.api.ProducerRequest = { - val correlationId = SyncProducerConfig.DefaultCorrelationId - val clientId = SyncProducerConfig.DefaultClientId - val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs + def produceRequest(topic: String, + partition: Int, + message: ByteBufferMessageSet, + acks: Int = SyncProducerConfig.DefaultRequiredAcks, + timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs, + correlationId: Int = 0, + clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = { + produceRequestWithAcks(Seq(topic), Seq(partition), message, acks, timeout, correlationId, clientId) + } + + def produceRequestWithAcks(topics: Seq[String], + partitions: Seq[Int], + message: ByteBufferMessageSet, + acks: Int = SyncProducerConfig.DefaultRequiredAcks, + timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs, + correlationId: Int = 0, + clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = { val data = topics.flatMap(topic => partitions.map(partition => (TopicAndPartition(topic, partition), message)) ) - new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*)) + new ProducerRequest(correlationId, clientId, acks.toShort, timeout, Map(data:_*)) } def makeLeaderForPartition(zkClient: ZkClient, topic: String,