diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 4ed071a..b4fb874 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -30,7 +30,7 @@ case class PartitionFetchInfo(offset: Long, fetchSize: Int) object FetchRequest { - val CurrentVersion = 1.shortValue() + val CurrentVersion = 0.shortValue val DefaultMaxWait = 0 val DefaultMinBytes = 0 val ReplicaFetcherClientId = "replica-fetcher" diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 0f989fe..94650f1 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -136,12 +136,10 @@ class TopicDataSend(val topicData: TopicData) extends Send { object FetchResponse { val headerSize = - 2 + /* versionId */ 4 + /* correlationId */ 4 /* topic count */ def readFrom(buffer: ByteBuffer): FetchResponse = { - val versionId = buffer.getShort val correlationId = buffer.getInt val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { @@ -151,13 +149,12 @@ object FetchResponse { (TopicAndPartition(topicData.topic, partitionId), partitionData) } }) - FetchResponse(versionId, correlationId, Map(pairs:_*)) + FetchResponse(correlationId, Map(pairs:_*)) } } -case class FetchResponse(versionId: Short, - correlationId: Int, +case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchResponsePartitionData]) { /** @@ -206,7 +203,6 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send { private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize) buffer.putInt(size) - buffer.putShort(fetchResponse.versionId) buffer.putInt(fetchResponse.correlationId) buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count buffer.rewind() diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 7459f4a..9759949 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -79,7 +79,7 @@ case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndContr } object LeaderAndIsrRequest { - val CurrentVersion = 1.shortValue() + val CurrentVersion = 0.shortValue val DefaultClientId = "" val IsInit: Boolean = true val NotInit: Boolean = false diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala index c8f1630..dbd85d0 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala @@ -26,7 +26,6 @@ import collection.Map object LeaderAndIsrResponse { def readFrom(buffer: ByteBuffer): LeaderAndIsrResponse = { - val versionId = buffer.getShort val correlationId = buffer.getInt val errorCode = buffer.getShort val numEntries = buffer.getInt @@ -37,19 +36,17 @@ object LeaderAndIsrResponse { val partitionErrorCode = buffer.getShort responseMap.put((topic, partition), partitionErrorCode) } - new LeaderAndIsrResponse(versionId, correlationId, responseMap, errorCode) + new LeaderAndIsrResponse(correlationId, responseMap, errorCode) } } -case class LeaderAndIsrResponse(versionId: Short, - correlationId: Int, +case class LeaderAndIsrResponse(correlationId: Int, responseMap: Map[(String, Int), Short], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse { def sizeInBytes(): Int ={ var size = - 2 /* version id */ + 4 /* correlation id */ + 2 /* error code */ + 4 /* number of responses */ @@ -63,7 +60,6 @@ case class LeaderAndIsrResponse(versionId: Short, } def writeTo(buffer: ByteBuffer) { - buffer.putShort(versionId) buffer.putInt(correlationId) buffer.putShort(errorCode) buffer.putInt(responseMap.size) diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 5239538..6c522bc 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -23,7 +23,7 @@ import kafka.api.ApiUtils._ object OffsetRequest { - val CurrentVersion = 1.shortValue() + val CurrentVersion = 0.shortValue val DefaultClientId = "" val SmallestTimeString = "smallest" diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala index 7818b66..264e200 100644 --- a/core/src/main/scala/kafka/api/OffsetResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetResponse.scala @@ -25,7 +25,6 @@ import kafka.api.ApiUtils._ object OffsetResponse { def readFrom(buffer: ByteBuffer): OffsetResponse = { - val versionId = buffer.getShort val correlationId = buffer.getInt val numTopics = buffer.getInt val pairs = (1 to numTopics).flatMap(_ => { @@ -39,7 +38,7 @@ object OffsetResponse { (TopicAndPartition(topic, partition), PartitionOffsetsResponse(error, offsets)) }) }) - OffsetResponse(versionId, correlationId, Map(pairs:_*)) + OffsetResponse(correlationId, Map(pairs:_*)) } } @@ -48,8 +47,7 @@ object OffsetResponse { case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) -case class OffsetResponse(versionId: Short, - correlationId: Int, +case class OffsetResponse(correlationId: Int, partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse]) extends RequestOrResponse { @@ -58,7 +56,6 @@ case class OffsetResponse(versionId: Short, def hasError = partitionErrorAndOffsets.values.exists(_.error != ErrorMapping.NoError) val sizeInBytes = { - 2 + /* versionId */ 4 + /* correlation id */ 4 + /* topic count */ offsetsGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { @@ -77,7 +74,6 @@ case class OffsetResponse(versionId: Short, } def writeTo(buffer: ByteBuffer) { - buffer.putShort(versionId) buffer.putInt(correlationId) buffer.putInt(offsetsGroupedByTopic.size) // topic count offsetsGroupedByTopic.foreach { diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 87700a0..9edc4dd 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -25,7 +25,7 @@ import kafka.api.ApiUtils._ object ProducerRequest { - val CurrentVersion: Short = 0 + val CurrentVersion = 0.shortValue def readFrom(buffer: ByteBuffer): ProducerRequest = { val versionId: Short = buffer.getShort diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 62c9bc4..743227d 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -25,7 +25,6 @@ import kafka.api.ApiUtils._ object ProducerResponse { def readFrom(buffer: ByteBuffer): ProducerResponse = { - val versionId = buffer.getShort val correlationId = buffer.getInt val topicCount = buffer.getInt val statusPairs = (1 to topicCount).flatMap(_ => { @@ -39,15 +38,14 @@ object ProducerResponse { }) }) - ProducerResponse(versionId, correlationId, Map(statusPairs:_*)) + ProducerResponse(correlationId, Map(statusPairs:_*)) } } case class ProducerResponseStatus(error: Short, offset: Long) -case class ProducerResponse(versionId: Short, - correlationId: Int, +case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse { /** @@ -59,7 +57,6 @@ case class ProducerResponse(versionId: Short, val sizeInBytes = { val groupedStatus = statusGroupedByTopic - 2 + /* version id */ 4 + /* correlation id */ 4 + /* topic count */ groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => { @@ -76,8 +73,6 @@ case class ProducerResponse(versionId: Short, def writeTo(buffer: ByteBuffer) { val groupedStatus = statusGroupedByTopic - - buffer.putShort(versionId) buffer.putInt(correlationId) buffer.putInt(groupedStatus.size) // topic count diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala index 6583d64..deb195f 100644 --- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -25,7 +25,7 @@ import kafka.network.InvalidRequestException object StopReplicaRequest extends Logging { - val CurrentVersion = 1.shortValue() + val CurrentVersion = 0.shortValue val DefaultClientId = "" val DefaultAckTimeout = 100 diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala index 6062a0c..fa66b99 100644 --- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala +++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala @@ -26,7 +26,6 @@ import kafka.api.ApiUtils._ object StopReplicaResponse { def readFrom(buffer: ByteBuffer): StopReplicaResponse = { - val versionId = buffer.getShort val correlationId = buffer.getInt val errorCode = buffer.getShort val numEntries = buffer.getInt @@ -38,18 +37,16 @@ object StopReplicaResponse { val partitionErrorCode = buffer.getShort() responseMap.put((topic, partition), partitionErrorCode) } - new StopReplicaResponse(versionId, correlationId, responseMap.toMap, errorCode) + new StopReplicaResponse(correlationId, responseMap.toMap, errorCode) } } -case class StopReplicaResponse(val versionId: Short, - val correlationId: Int, +case class StopReplicaResponse(val correlationId: Int, val responseMap: Map[(String, Int), Short], val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{ def sizeInBytes(): Int ={ var size = - 2 /* version id */ + 4 /* correlation id */ + 2 /* error code */ + 4 /* number of responses */ @@ -63,7 +60,6 @@ case class StopReplicaResponse(val versionId: Short, } def writeTo(buffer: ByteBuffer) { - buffer.putShort(versionId) buffer.putInt(correlationId) buffer.putShort(errorCode) buffer.putInt(responseMap.size) diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 0a99779..5bdb2c1 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -23,7 +23,7 @@ import collection.mutable.ListBuffer import kafka.utils.Logging object TopicMetadataRequest extends Logging { - val CurrentVersion = 1.shortValue() + val CurrentVersion = 0.shortValue val DefaultClientId = "" /** diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala index 1bf4cc4..af76776 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala @@ -23,27 +23,24 @@ import java.nio.ByteBuffer object TopicMetadataResponse { def readFrom(buffer: ByteBuffer): TopicMetadataResponse = { - val versionId = buffer.getShort val correlationId = buffer.getInt val brokerCount = buffer.getInt val brokers = (0 until brokerCount).map(_ => Broker.readFrom(buffer)) val brokerMap = brokers.map(b => (b.id, b)).toMap val topicCount = buffer.getInt val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer, brokerMap)) - new TopicMetadataResponse(versionId, topicsMetadata, correlationId) + new TopicMetadataResponse(topicsMetadata, correlationId) } } -case class TopicMetadataResponse(versionId: Short, - topicsMetadata: Seq[TopicMetadata], +case class TopicMetadataResponse(topicsMetadata: Seq[TopicMetadata], correlationId: Int) extends RequestOrResponse { val sizeInBytes: Int = { val brokers = extractBrokers(topicsMetadata).values - 2 + 4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum + 4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum } def writeTo(buffer: ByteBuffer) { - buffer.putShort(versionId) buffer.putInt(correlationId) /* brokers */ val brokers = extractBrokers(topicsMetadata).values diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d7a5736..ac90b20 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -70,7 +70,7 @@ class KafkaApis(val requestChannel: RequestChannel, case (topicAndPartition, data) => (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l)) } - val errorResponse = ProducerResponse(apiRequest.versionId, apiRequest.correlationId, producerResponseStatus) + val errorResponse = ProducerResponse(apiRequest.correlationId, producerResponseStatus) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) error("error when handling request %s".format(apiRequest), e) case RequestKeys.FetchKey => @@ -79,7 +79,7 @@ class KafkaApis(val requestChannel: RequestChannel, case (topicAndPartition, data) => (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, null)) } - val errorResponse = FetchResponse(apiRequest.versionId, apiRequest.correlationId, fetchResponsePartitionData) + val errorResponse = FetchResponse(apiRequest.correlationId, fetchResponsePartitionData) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse))) error("error when handling request %s".format(apiRequest), e) case RequestKeys.OffsetsKey => @@ -88,7 +88,7 @@ class KafkaApis(val requestChannel: RequestChannel, case (topicAndPartition, partitionOffsetRequest) => (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null)) } - val errorResponse = OffsetResponse(apiRequest.versionId, apiRequest.correlationId, partitionOffsetResponseMap) + val errorResponse = OffsetResponse(apiRequest.correlationId, partitionOffsetResponseMap) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) error("error when handling request %s".format(apiRequest), e) case RequestKeys.MetadataKey => @@ -96,7 +96,7 @@ class KafkaApis(val requestChannel: RequestChannel, val topicMeatadata = apiRequest.topics.map { topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } - val errorResponse = TopicMetadataResponse(apiRequest.versionId, topicMeatadata, apiRequest.correlationId) + val errorResponse = TopicMetadataResponse(topicMeatadata, apiRequest.correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) error("error when handling request %s".format(apiRequest), e) case RequestKeys.LeaderAndIsrKey => @@ -104,7 +104,7 @@ class KafkaApis(val requestChannel: RequestChannel, val responseMap = apiRequest.partitionStateInfos.map { case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } - val errorResponse = LeaderAndIsrResponse(apiRequest.versionId, apiRequest.correlationId, responseMap) + val errorResponse = LeaderAndIsrResponse(apiRequest.correlationId, responseMap) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) error("error when handling request %s".format(apiRequest), e) case RequestKeys.StopReplicaKey => @@ -113,7 +113,7 @@ class KafkaApis(val requestChannel: RequestChannel, case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) }.toMap error("error when handling request %s".format(apiRequest), e) - val errorResponse = StopReplicaResponse(apiRequest.versionId, apiRequest.correlationId, responseMap) + val errorResponse = StopReplicaResponse(apiRequest.correlationId, responseMap) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } } finally @@ -127,7 +127,7 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Handling leader and ISR request " + leaderAndIsrRequest) try { val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest) - val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, leaderAndIsrRequest.correlationId, response, error) + val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse))) } catch { case e: KafkaStorageException => @@ -144,7 +144,7 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Handling stop replica request " + stopReplicaRequest) val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) - val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, stopReplicaRequest.correlationId, response.toMap, error) + val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads() @@ -161,7 +161,7 @@ class KafkaApis(val requestChannel: RequestChannel, // send any newly unblocked responses for(fetchReq <- satisfied) { val topicData = readMessageSets(fetchReq.fetch) - val response = FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData) + val response = FetchResponse(fetchReq.fetch.correlationId, topicData) requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response))) } } @@ -192,7 +192,7 @@ class KafkaApis(val requestChannel: RequestChannel, allPartitionHaveReplicationFactorOne || numPartitionsInError == produceRequest.numPartitions) { val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap - val response = ProducerResponse(produceRequest.versionId, produceRequest.correlationId, statuses) + val response = ProducerResponse(produceRequest.correlationId, statuses) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } else { // create a list of (topic, partition) pairs to use as keys for this delayed request @@ -292,7 +292,7 @@ class KafkaApis(val requestChannel: RequestChannel, bytesReadable >= fetchRequest.minBytes || fetchRequest.numPartitions <= 0) { debug("Returning fetch response %s for fetch request with correlation id %d".format(dataRead.values.map(_.error).mkString(","), fetchRequest.correlationId)) - val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, dataRead) + val response = new FetchResponse(fetchRequest.correlationId, dataRead) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } else { debug("Putting fetch request into purgatory") @@ -408,7 +408,7 @@ class KafkaApis(val requestChannel: RequestChannel, (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) ) } }) - val response = OffsetResponse(OffsetRequest.CurrentVersion, offsetRequest.correlationId, responseMap) + val response = OffsetResponse(offsetRequest.correlationId, responseMap) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } @@ -457,7 +457,7 @@ class KafkaApis(val requestChannel: RequestChannel, } }) topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString)) - val response = new TopicMetadataResponse(metadataRequest.versionId, topicsMetadata.toSeq, metadataRequest.correlationId) + val response = new TopicMetadataResponse(topicsMetadata.toSeq, metadataRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } @@ -514,7 +514,7 @@ class KafkaApis(val requestChannel: RequestChannel, debug("Expiring fetch request %s.".format(delayed.fetch)) try { val topicData = readMessageSets(delayed.fetch) - val response = FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData) + val response = FetchResponse(delayed.fetch.correlationId, topicData) val fromFollower = delayed.fetch.isFromFollower delayedRequestMetrics.recordDelayedFetchExpired(fromFollower) requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response))) @@ -564,7 +564,7 @@ class KafkaApis(val requestChannel: RequestChannel, (status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset)) }) - val response = ProducerResponse(produce.versionId, produce.correlationId, finalErrorsAndOffsets) + val response = ProducerResponse(produce.correlationId, finalErrorsAndOffsets) requestChannel.sendResponse(new RequestChannel.Response( request, new BoundedByteBufferSend(response))) diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 391e724..509b020 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -95,7 +95,7 @@ object SerializationTestUtils{ def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = { val responseMap = Map(((topic1, 0), ErrorMapping.NoError), ((topic2, 0), ErrorMapping.NoError)) - new LeaderAndIsrResponse(1, 1, responseMap) + new LeaderAndIsrResponse(1, responseMap) } def createTestStopReplicaRequest() : StopReplicaRequest = { @@ -105,7 +105,7 @@ object SerializationTestUtils{ def createTestStopReplicaResponse() : StopReplicaResponse = { val responseMap = Map(((topic1, 0), ErrorMapping.NoError), ((topic2, 0), ErrorMapping.NoError)) - new StopReplicaResponse(1, 0, responseMap.toMap) + new StopReplicaResponse(0, responseMap.toMap) } def createTestProducerRequest: ProducerRequest = { @@ -113,7 +113,7 @@ object SerializationTestUtils{ } def createTestProducerResponse: ProducerResponse = - ProducerResponse(1, 1, Map( + ProducerResponse(1, Map( TopicAndPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001), TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001) )) @@ -123,7 +123,7 @@ object SerializationTestUtils{ } def createTestFetchResponse: FetchResponse = { - FetchResponse(1, 1, topicDataFetchResponse) + FetchResponse(1, topicDataFetchResponse) } def createTestOffsetRequest = new OffsetRequest( @@ -132,7 +132,7 @@ object SerializationTestUtils{ ) def createTestOffsetResponse: OffsetResponse = { - new OffsetResponse(OffsetRequest.CurrentVersion, 0, collection.immutable.Map( + new OffsetResponse(0, collection.immutable.Map( TopicAndPartition(topic1, 1) -> PartitionOffsetsResponse(ErrorMapping.NoError, Seq(1000l, 2000l, 3000l, 4000l))) ) } @@ -142,7 +142,7 @@ object SerializationTestUtils{ } def createTestTopicMetadataResponse: TopicMetadataResponse = { - new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2), 1) + new TopicMetadataResponse(Seq(topicmetaData1, topicmetaData2), 1) } } diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 534df19..90a7ed8 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -429,11 +429,11 @@ class AsyncProducerTest extends JUnit3Suite { // On the third try for partition 0, let it succeed. val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 0) val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 1) - val response1 = ProducerResponse(ProducerRequest.CurrentVersion, 0, + val response1 = ProducerResponse(0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)), (TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), correlationId = 2) - val response2 = ProducerResponse(ProducerRequest.CurrentVersion, 0, + val response2 = ProducerResponse(0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException