diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index fbfc9d3..0ee6ea0 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -32,6 +32,8 @@ object RequestKeys { val OffsetCommitKey: Short = 8 val OffsetFetchKey: Short = 9 val ConsumerMetadataKey: Short = 10 + val TransactionKey: Short = 11 + val TxCoordinatorMetadataKey: Short = 12 val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= Map(ProduceKey -> ("Produce", ProducerRequest.readFrom), @@ -44,7 +46,9 @@ object RequestKeys { ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom), OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom), OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom), - ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom)) + ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom), + TransactionKey -> ("TransactionRequest", TransactionRequest.readFrom), + TxCoordinatorMetadataKey -> ("TxCoordinatorMetadata", TxCoordinatorMetadataRequest.readFrom)) def nameForKey(key: Short): String = { keyToNameAndDeserializerMap.get(key) match { diff --git a/core/src/main/scala/kafka/api/TransactionRequest.scala b/core/src/main/scala/kafka/api/TransactionRequest.scala new file mode 100644 index 0000000..c315bc3 --- /dev/null +++ b/core/src/main/scala/kafka/api/TransactionRequest.scala @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio._ +import kafka.api.ApiUtils._ +import kafka.common._ +import kafka.network.RequestChannel.Response +import kafka.network.{RequestChannel, BoundedByteBufferSend} +import collection.mutable.{LinkedHashMap, LinkedHashSet} + + +object TransactionRequest { + val CurrentVersion = 0.shortValue + + def readFrom(buffer: ByteBuffer): TransactionRequest = { + val versionId: Short = buffer.getShort + val correlationId: Int = buffer.getInt + val clientId: String = readShortString(buffer) + val ackTimeoutMs: Int = buffer.getInt + val requestInfo = TransactionRequestInfo.readFrom(buffer) + + TransactionRequest(versionId, correlationId, clientId, ackTimeoutMs, requestInfo) + } + + def TransactionWithNewControl(oldTxRequest: TransactionRequest, newTxControl: Short): TransactionRequest = { + oldTxRequest.copy(requestInfo = oldTxRequest.requestInfo.copy(txControl = newTxControl)) + } +} + +object TxControlTypes { + val Ongoing: Short = 0 + val Begin: Short = 1 + val PreCommit: Short = 2 + val Commit: Short = 3 + val Committed: Short = 4 + val PreAbort: Short = 5 + val Abort: Short = 6 + val Aborted: Short = 7 +} + + +case class TransactionRequest(versionId: Short = TransactionRequest.CurrentVersion, + override val correlationId: Int, + clientId: String, + ackTimeoutMs: Int, + requestInfo: TransactionRequestInfo) + extends RequestOrResponse(Some(RequestKeys.TransactionKey), correlationId) { + + def this(correlationId: Int, + clientId: String, + ackTimeoutMs: Int, + requestInfo: TransactionRequestInfo) = + this(TransactionRequest.CurrentVersion, correlationId, clientId, ackTimeoutMs, requestInfo) + + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + buffer.putInt(correlationId) + writeShortString(buffer, clientId) + buffer.putInt(ackTimeoutMs) + requestInfo.writeTo(buffer) + } + + def sizeInBytes: Int = { + 2 + /* versionId */ + 4 + /* correlationId */ + shortStringLength(clientId) + /* client id */ + 4 + /* ackTimeoutMs */ + requestInfo.sizeInBytes + } + + override def toString(): String = { + describe(true) + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + + val transactionResponseStatus = requestInfo.txPartitions.map { + topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + } + val errorResponse = TransactionResponse(correlationId, requestInfo.txId, transactionResponseStatus.toMap) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } + + def responseFor(status: Map[TopicAndPartition, Short]) = { + TransactionResponse(correlationId, requestInfo.txId, status); + } + + override def describe(details: Boolean): String = { + val transactionRequest = new StringBuilder + transactionRequest.append("Name: " + this.getClass.getSimpleName) + transactionRequest.append("; Version: " + versionId) + transactionRequest.append("; CorrelationId: " + correlationId) + transactionRequest.append("; ClientId: " + clientId) + transactionRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms") + if (details) + transactionRequest.append("; requestInfos: " + requestInfo) + transactionRequest.toString() + } +} + +object TransactionRequestInfo { + + def readFrom(buffer: ByteBuffer): TransactionRequestInfo = { + val txGroupId: String = readShortString(buffer) + val txId: Int = buffer.getInt + val txControl: Short = buffer.getShort + val txTimeoutMs: Int = buffer.getInt + + val topicCount = buffer.getInt + val txPartitions = (1 to topicCount).flatMap(_ => { + val topic = readShortString(buffer) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partition = buffer.getInt + TopicAndPartition(topic, partition) + }) + }).toList + + TransactionRequestInfo(txGroupId, txId, txControl, txTimeoutMs, txPartitions) + } +} + +case class TransactionRequestInfo(txGroupId: String, txId: Int, txControl: Short, txTimeoutMs: Int, + txPartitions: Seq[TopicAndPartition]) { + + private lazy val partitionGroupedByTopicOrdered = { + val map = LinkedHashMap[String, LinkedHashSet[TopicAndPartition] ]() + for (txPartition <- txPartitions) { + val key = txPartition.topic + map(key) = map.lift(key).getOrElse(LinkedHashSet()) + txPartition + } + map + } + + def getTopicPartitionToAppendBroker: Option[TopicAndPartition] = { + txPartitions.headOption + } + + def sizeInBytes: Int = { + shortStringLength(txGroupId) + /* groupId */ + 4 + /* txId */ + 2 + /* txControl */ + 4 + /* txTimeoutMs */ + 4 + /* number of topics */ + partitionGroupedByTopicOrdered.foldLeft(0)((foldedTopics, currTopic) => { + foldedTopics + + shortStringLength(currTopic._1) + /* topic */ + 4 + /* number of partitions */ + 4 * currTopic._2.size /* partitions */ + }) + } + + def writeTo(buffer: ByteBuffer) { + writeShortString(buffer, txGroupId) + buffer.putInt(txId) + buffer.putShort(txControl) + buffer.putInt(txTimeoutMs) + buffer.putInt(partitionGroupedByTopicOrdered.size) + partitionGroupedByTopicOrdered.foreach { + case (topic, topicAndPartitions) => + writeShortString(buffer, topic) //write the topic + buffer.putInt(topicAndPartitions.size) //the number of partitions + topicAndPartitions.foreach {topicAndPartition: TopicAndPartition => + buffer.putInt(topicAndPartition.partition) + } + } + } + + override def toString(): String = { + val requestInfo = new StringBuilder + requestInfo.append("gId: " + txGroupId) + requestInfo.append("; txId: " + txId) + requestInfo.append("; txControl: " + txControl) + requestInfo.append("; txTimeoutMs: " + txTimeoutMs) + requestInfo.append("; TopicAndPartition: (" + txPartitions.mkString(",") + ")") + requestInfo.toString() + } + +} diff --git a/core/src/main/scala/kafka/api/TransactionResponse.scala b/core/src/main/scala/kafka/api/TransactionResponse.scala new file mode 100644 index 0000000..716c013 --- /dev/null +++ b/core/src/main/scala/kafka/api/TransactionResponse.scala @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.api.ApiUtils._ +import kafka.common.{TopicAndPartition, ErrorMapping} + +import scala.collection.Map + +object TransactionResponse { + def readFrom(buffer: ByteBuffer): TransactionResponse = { + + val correlationId = buffer.getInt + val txId = buffer.getInt + val topicCount = buffer.getInt + val statusPairs = (1 to topicCount).flatMap(_ => { + val topic = readShortString(buffer) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partition = buffer.getInt + val error = buffer.getShort + (TopicAndPartition(topic, partition), error) + }) + }) + TransactionResponse(correlationId, txId, Map(statusPairs:_*)) + } +} + + +case class TransactionResponse(override val correlationId: Int, + txId: Int, + status: Map[TopicAndPartition, Short]) + extends RequestOrResponse(correlationId = correlationId) { + + private lazy val statusGroupedByTopic = status.groupBy(_._1.topic) + + def hasError = status.values.exists(_ != ErrorMapping.NoError) + + val sizeInBytes = { + val groupedStatus = statusGroupedByTopic + 4 + /* correlation id */ + 4 + /* transaction id */ + 4 + /* topic count */ + groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => { + foldedTopics + + shortStringLength(currTopic._1) + + 4 + /* partition count for this topic */ + currTopic._2.size * { + 4 + /* partition id */ + 2 /* error code */ + } + }) + } + + def writeTo(buffer: ByteBuffer) { + val groupedStatus = statusGroupedByTopic + buffer.putInt(correlationId) + buffer.putInt(txId) + buffer.putInt(groupedStatus.size) // topic count + + groupedStatus.foreach(topicStatus => { + val (topic, errors) = topicStatus + writeShortString(buffer, topic) + buffer.putInt(errors.size) // partition count + errors.foreach { + case (TopicAndPartition(_, partition), error) => + buffer.putInt(partition) + buffer.putShort(error) + } + }) + } + + override def describe(details: Boolean):String = { toString } +} diff --git a/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala b/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala new file mode 100644 index 0000000..ac7d8b8 --- /dev/null +++ b/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.network.RequestChannel.Response +import kafka.common.ErrorMapping + +object TxCoordinatorMetadataRequest { + val CurrentVersion = 0.shortValue + val DefaultClientId = "" + + def readFrom(buffer: ByteBuffer) = { + // envelope + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = ApiUtils.readShortString(buffer) + + // request + val txGroupId = ApiUtils.readShortString(buffer) + TxCoordinatorMetadataRequest(txGroupId, versionId, correlationId, clientId) + } + +} + +case class TxCoordinatorMetadataRequest(txGroupId: String, + versionId: Short = TxCoordinatorMetadataRequest.CurrentVersion, + override val correlationId: Int = 0, + clientId: String = TxCoordinatorMetadataRequest.DefaultClientId) + extends RequestOrResponse(Some(RequestKeys.TxCoordinatorMetadataKey), correlationId) { + + def sizeInBytes = + 2 + /* versionId */ + 4 + /* correlationId */ + ApiUtils.shortStringLength(clientId) + + ApiUtils.shortStringLength(txGroupId) + + def writeTo(buffer: ByteBuffer) { + // envelope + buffer.putShort(versionId) + buffer.putInt(correlationId) + ApiUtils.writeShortString(buffer, clientId) + + // transaction coordinator metadata request + ApiUtils.writeShortString(buffer, txGroupId) + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + // return TransactionCoordinatorNotAvailable for all uncaught errors + val errorResponse = TxCoordinatorMetadataResponse(None, ErrorMapping.TxCoordinatorNotAvailableCode) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } + + def describe(details: Boolean) = { + val transactionMetadataRequest = new StringBuilder + transactionMetadataRequest.append("Name: " + this.getClass.getSimpleName) + transactionMetadataRequest.append("; Version: " + versionId) + transactionMetadataRequest.append("; CorrelationId: " + correlationId) + transactionMetadataRequest.append("; ClientId: " + clientId) + transactionMetadataRequest.append("; Group: " + txGroupId) + transactionMetadataRequest.toString() + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala b/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala new file mode 100644 index 0000000..b43f389 --- /dev/null +++ b/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.cluster.Broker +import kafka.common.ErrorMapping + +object TxCoordinatorMetadataResponse { + val CurrentVersion = 0 + + private val NoBrokerOpt = Some(Broker(id = -1, host = "", port = -1)) + + def readFrom(buffer: ByteBuffer) = { + val correlationId = buffer.getInt + val errorCode = buffer.getShort + val coordinatorOpt = if (errorCode == ErrorMapping.NoError) + Some(Broker.readFrom(buffer)) + else + None + + TxCoordinatorMetadataResponse(coordinatorOpt, errorCode, correlationId) + } + +} + +case class TxCoordinatorMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, override val correlationId: Int = 0) + extends RequestOrResponse(correlationId = correlationId) { + + def sizeInBytes = + 4 + /* correlationId */ + 2 + /* error code */ + coordinatorOpt.orElse(TxCoordinatorMetadataResponse.NoBrokerOpt).get.sizeInBytes + + def writeTo(buffer: ByteBuffer) { + buffer.putInt(correlationId) + buffer.putShort(errorCode) + coordinatorOpt.orElse(TxCoordinatorMetadataResponse.NoBrokerOpt).foreach(_.writeTo(buffer)) + } + + def describe(details: Boolean) = toString +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 5559d26..b185421 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -46,6 +46,7 @@ object ErrorMapping { val OffsetsLoadInProgressCode: Short = 14 val ConsumerCoordinatorNotAvailableCode: Short = 15 val NotCoordinatorForConsumerCode: Short = 16 + val TxCoordinatorNotAvailableCode: Short = 17 private val exceptionToCode = Map[Class[Throwable], Short]( diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index a2117b3..5eb40a8 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -180,6 +180,25 @@ object SerializationTestUtils { ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError) } + def createTestTransactionRequest: TransactionRequest = { + new TransactionRequest(1, "client 1", 1000, + TransactionRequestInfo("group 1", 1, 1, 1000, Seq( + TopicAndPartition(topic1, 0), TopicAndPartition(topic2, 0) + ))) + } + + def createTestTransactionResponse: TransactionResponse = { + TransactionResponse(1, 1, 0.toShort) + } + + def createTestTxCoordinatorMetadataRequest: TxCoordinatorMetadataRequest = { + TxCoordinatorMetadataRequest("txGroup 1", clientId = "client 1") + } + + def createTestTxCoordinatorMetadataResponse: TxCoordinatorMetadataResponse = { + TxCoordinatorMetadataResponse(Some(brokers.head), ErrorMapping.NoError) + } + } class RequestResponseSerializationTest extends JUnitSuite { @@ -201,6 +220,10 @@ class RequestResponseSerializationTest extends JUnitSuite { private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode) + private val transactionRequest = SerializationTestUtils.createTestTransactionRequest + private val transactionResponse = SerializationTestUtils.createTestTransactionResponse + private val txCoordinatorMetadataRequest = SerializationTestUtils.createTestTxCoordinatorMetadataRequest + private val txCoordinatorMetadataResponse = SerializationTestUtils.createTestTxCoordinatorMetadataResponse @Test def testSerializationAndDeserialization() { @@ -214,7 +237,9 @@ class RequestResponseSerializationTest extends JUnitSuite { topicMetadataRequest, topicMetadataResponse, offsetCommitRequest, offsetCommitResponse, offsetFetchRequest, offsetFetchResponse, - consumerMetadataRequest, consumerMetadataResponse, consumerMetadataResponseNoCoordinator) + consumerMetadataRequest, consumerMetadataResponse, consumerMetadataResponseNoCoordinator, + transactionRequest, transactionResponse, + txCoordinatorMetadataRequest, txCoordinatorMetadataResponse) requestsAndResponses.foreach { original => val buffer = ByteBuffer.allocate(original.sizeInBytes)