diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index fbfc9d3..0ee6ea0 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -32,6 +32,8 @@ object RequestKeys { val OffsetCommitKey: Short = 8 val OffsetFetchKey: Short = 9 val ConsumerMetadataKey: Short = 10 + val TransactionKey: Short = 11 + val TxCoordinatorMetadataKey: Short = 12 val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= Map(ProduceKey -> ("Produce", ProducerRequest.readFrom), @@ -44,7 +46,9 @@ object RequestKeys { ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom), OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom), OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom), - ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom)) + ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom), + TransactionKey -> ("TransactionRequest", TransactionRequest.readFrom), + TxCoordinatorMetadataKey -> ("TxCoordinatorMetadata", TxCoordinatorMetadataRequest.readFrom)) def nameForKey(key: Short): String = { keyToNameAndDeserializerMap.get(key) match { diff --git a/core/src/main/scala/kafka/api/TransactionRequest.scala b/core/src/main/scala/kafka/api/TransactionRequest.scala new file mode 100644 index 0000000..e1a51c2 --- /dev/null +++ b/core/src/main/scala/kafka/api/TransactionRequest.scala @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio._ +import kafka.api.ApiUtils._ +import kafka.common._ +import kafka.network.RequestChannel.Response +import kafka.network.{RequestChannel, BoundedByteBufferSend} +import collection.mutable.{LinkedHashMap, LinkedHashSet} + + +object TransactionRequest { + val CurrentVersion = 0.shortValue + + def readFrom(buffer: ByteBuffer): TransactionRequest = { + val versionId: Short = buffer.getShort + val correlationId: Int = buffer.getInt + val clientId: String = readShortString(buffer) + val ackTimeoutMs: Int = buffer.getInt + val requestInfo = TransactionRequestInfo.readFrom(buffer) + + TransactionRequest(versionId, correlationId, clientId, ackTimeoutMs, requestInfo) + } + + def TransactionWithNewTxId(oldTxRequest: TransactionRequest, txId: Int) = { + TransactionRequest( + oldTxRequest.versionId, + oldTxRequest.correlationId, + oldTxRequest.clientId, + oldTxRequest.ackTimeoutMs, + TransactionRequestInfo( + oldTxRequest.requestInfo.groupId, + txId, + oldTxRequest.requestInfo.txControl, + oldTxRequest.requestInfo.txTimeoutMs, + oldTxRequest.requestInfo.txPartitions + )) + } + + def TransactionWithUpdatedPartition(oldTxRequest: TransactionRequest, partition: TopicAndPartition) = { + TransactionRequest( + oldTxRequest.versionId, + oldTxRequest.correlationId, + oldTxRequest.clientId, + oldTxRequest.ackTimeoutMs, + TransactionRequestInfo( + oldTxRequest.requestInfo.groupId, + oldTxRequest.requestInfo.txId, + oldTxRequest.requestInfo.txControl, + oldTxRequest.requestInfo.txTimeoutMs, + partition :: (oldTxRequest.requestInfo.txPartitions.filter(_ != partition)) + )) + } + + def TransactionWithNewControl(oldTxRequest: TransactionRequest, txControl: Short): TransactionRequest = { + TransactionRequest( + oldTxRequest.versionId, + oldTxRequest.correlationId, + oldTxRequest.clientId, + oldTxRequest.ackTimeoutMs, + TransactionRequestInfo( + oldTxRequest.requestInfo.groupId, + oldTxRequest.requestInfo.txId, + txControl, + oldTxRequest.requestInfo.txTimeoutMs, + oldTxRequest.requestInfo.txPartitions + )) + } +} + +object TransactionControlKeys { + val Begin: Short = 0 + val PreCommit: Short = 1 + val Commit: Short = 2 + val Committed: Short = 3 + val PreAbort: Short = 4 + val Abort: Short = 5 + val Aborted: Short = 6 +} + + +case class TransactionRequest(versionId: Short = TransactionRequest.CurrentVersion, + override val correlationId: Int, + clientId: String, + ackTimeoutMs: Int, + requestInfo: TransactionRequestInfo) + extends RequestOrResponse(Some(RequestKeys.TransactionKey), correlationId) { + + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + buffer.putInt(correlationId) + writeShortString(buffer, clientId) + buffer.putInt(ackTimeoutMs) + requestInfo.writeTo(buffer) + } + + def sizeInBytes: Int = { + 2 + /* versionId */ + 4 + /* correlationId */ + shortStringLength(clientId) + /* client id */ + 4 + /* ackTimeoutMs */ + requestInfo.sizeInBytes + } + + override def toString(): String = { + describe(true) + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val errorResponse = TransactionResponse(correlationId, requestInfo.txId, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } + + def responseFor(errorCode: Short) = { + TransactionResponse(correlationId, requestInfo.txId, errorCode); + } + + override def describe(details: Boolean): String = { + val producerRequest = new StringBuilder + producerRequest.append("Name: " + this.getClass.getSimpleName) + producerRequest.append("; Version: " + versionId) + producerRequest.append("; CorrelationId: " + correlationId) + producerRequest.append("; ClientId: " + clientId) + producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms") + if (details) + producerRequest.append("; requestInfos: " + requestInfo) + producerRequest.toString() + } +} + +object TransactionRequestInfo { + + def readFrom(buffer: ByteBuffer): TransactionRequestInfo = { + val groupId: String = readShortString(buffer) + val txId: Int = buffer.getInt + val txControl: Short = buffer.getShort + val txTimeoutMs: Int = buffer.getInt + + val topicCount = buffer.getInt + val txPartitions = (1 to topicCount).flatMap(_ => { + val topic = readShortString(buffer) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partition = buffer.getInt + TopicAndPartition(topic, partition) + }) + }).toList + + TransactionRequestInfo(groupId, txId, txControl, txTimeoutMs, txPartitions) + } +} + +case class TransactionRequestInfo(groupId: String, txId: Int, txControl: Short, txTimeoutMs: Int, + txPartitions: collection.immutable.List[TopicAndPartition]) { + + private lazy val partitionGroupedByTopicOrdered = { + val map = LinkedHashMap[String, LinkedHashSet[TopicAndPartition] ]() + for (txPartition <- txPartitions) { + val key = txPartition.topic + map(key) = map.lift(key).getOrElse(LinkedHashSet()) + txPartition + } + map + } + + def getTopicPartitionToAppendBroker: Option[TopicAndPartition] = { + txPartitions.headOption + } + + def sizeInBytes: Int = { + shortStringLength(groupId) + /* groupId */ + 4 + /* txId */ + 2 + /* txControl */ + 4 + /* txTimeoutMs */ + 4 + /* number of topics */ + partitionGroupedByTopicOrdered.foldLeft(0)((foldedTopics, currTopic) => { + foldedTopics + + shortStringLength(currTopic._1) + /* topic */ + 4 + /* number of partitions */ + 4 * currTopic._2.size /* partitions */ + }) + } + + def writeTo(buffer: ByteBuffer) { + writeShortString(buffer, groupId) + buffer.putInt(txId) + buffer.putShort(txControl) + buffer.putInt(txTimeoutMs) + buffer.putInt(partitionGroupedByTopicOrdered.size) + partitionGroupedByTopicOrdered.foreach { + case (topic, topicAndPartitions) => + writeShortString(buffer, topic) //write the topic + buffer.putInt(topicAndPartitions.size) //the number of partitions + topicAndPartitions.foreach {topicAndPartition: TopicAndPartition => + buffer.putInt(topicAndPartition.partition) + } + } + } + + override def toString(): String = { + val requestInfo = new StringBuilder + requestInfo.append("gId: " + groupId) + requestInfo.append("; txId: " + txId) + requestInfo.append("; txControl: " + txControl) + requestInfo.append("; txTimeoutMs: " + txTimeoutMs) + requestInfo.append("; TopicAndPartition: (" + txPartitions.mkString(",") + ")") + requestInfo.toString() + } + +} diff --git a/core/src/main/scala/kafka/api/TransactionResponse.scala b/core/src/main/scala/kafka/api/TransactionResponse.scala new file mode 100644 index 0000000..8cc03c9 --- /dev/null +++ b/core/src/main/scala/kafka/api/TransactionResponse.scala @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.common.ErrorMapping + +object TransactionResponse { + def readFrom(buffer: ByteBuffer): TransactionResponse = { + val correlationId = buffer.getInt + val txId = buffer.getInt + val error = buffer.getShort + + TransactionResponse(correlationId, txId, error) + } +} + + +case class TransactionResponse(override val correlationId: Int, + txId: Int, + var error: Short) + extends RequestOrResponse(correlationId = correlationId) { + + def hasError = error != ErrorMapping.NoError + + val sizeInBytes = { + 4 + /* correlation id */ + 4 + /* TxId */ + 2 /* error code */ + } + + def writeTo(buffer: ByteBuffer) { + buffer.putInt(correlationId) + buffer.putInt(txId) + buffer.putShort(error) + } + + override def describe(details: Boolean):String = { toString } +} + diff --git a/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala b/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala new file mode 100644 index 0000000..73a1767 --- /dev/null +++ b/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.network.RequestChannel.Response +import kafka.common.ErrorMapping + +object TxCoordinatorMetadataRequest { + val CurrentVersion = 0.shortValue + val DefaultClientId = "" + + def readFrom(buffer: ByteBuffer) = { + // envelope + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = ApiUtils.readShortString(buffer) + + // request + val groupId = ApiUtils.readShortString(buffer) + TxCoordinatorMetadataRequest(groupId, versionId, correlationId, clientId) + } + +} + +case class TxCoordinatorMetadataRequest(groupId: String, + versionId: Short = TxCoordinatorMetadataRequest.CurrentVersion, + override val correlationId: Int = 0, + clientId: String = TxCoordinatorMetadataRequest.DefaultClientId) + extends RequestOrResponse(Some(RequestKeys.TxCoordinatorMetadataKey), correlationId) { + + def sizeInBytes = + 2 + /* versionId */ + 4 + /* correlationId */ + ApiUtils.shortStringLength(clientId) + + ApiUtils.shortStringLength(groupId) + + def writeTo(buffer: ByteBuffer) { + // envelope + buffer.putShort(versionId) + buffer.putInt(correlationId) + ApiUtils.writeShortString(buffer, clientId) + + // transaction coordinator metadata request + ApiUtils.writeShortString(buffer, groupId) + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + // return TransactionCoordinatorNotAvailable for all uncaught errors + val errorResponse = TxCoordinatorMetadataResponse(None, ErrorMapping.TransactionCoordinatorNotAvailableCode) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } + + def describe(details: Boolean) = { + val transactionMetadataRequest = new StringBuilder + transactionMetadataRequest.append("Name: " + this.getClass.getSimpleName) + transactionMetadataRequest.append("; Version: " + versionId) + transactionMetadataRequest.append("; CorrelationId: " + correlationId) + transactionMetadataRequest.append("; ClientId: " + clientId) + transactionMetadataRequest.append("; Group: " + groupId) + transactionMetadataRequest.toString() + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala b/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala new file mode 100644 index 0000000..06a4cfe --- /dev/null +++ b/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.cluster.Broker +import kafka.common.ErrorMapping + +object TxCoordinatorMetadataResponse { + val CurrentVersion = 0 + + private val NoBrokerOpt = Some(Broker(id = -1, host = "", port = -1)) + + def readFrom(buffer: ByteBuffer) = { + val correlationId = buffer.getInt + val errorCode = buffer.getShort + val coordinatorOpt = if (errorCode == ErrorMapping.NoError) + Some(Broker.readFrom(buffer)) + else + None + + TxCoordinatorMetadataResponse(coordinatorOpt, errorCode, correlationId) + } + +} + +case class TxCoordinatorMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, override val correlationId: Int = 0) + extends RequestOrResponse(correlationId = correlationId) { + + def sizeInBytes = + 4 + /* correlationId */ + 2 + /* error code */ + coordinatorOpt.orElse(TxCoordinatorMetadataResponse.NoBrokerOpt).get.sizeInBytes + + def writeTo(buffer: ByteBuffer) { + buffer.putInt(correlationId) + buffer.putShort(errorCode) + coordinatorOpt.orElse(TxCoordinatorMetadataResponse.NoBrokerOpt).foreach(_.writeTo(buffer)) + } + + def describe(details: Boolean) = toString +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 5559d26..703cca4 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -46,6 +46,7 @@ object ErrorMapping { val OffsetsLoadInProgressCode: Short = 14 val ConsumerCoordinatorNotAvailableCode: Short = 15 val NotCoordinatorForConsumerCode: Short = 16 + val TransactionCoordinatorNotAvailableCode: Short = 17 private val exceptionToCode = Map[Class[Throwable], Short](