From 7a576451ba2ee48c6ec4997781dd47da15784330 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Fri, 8 Aug 2014 21:20:05 -0700 Subject: [PATCH] KAFKA-1522; Tansactional messaging request/response definitions --- core/src/main/scala/kafka/api/RequestKeys.scala | 6 +- .../main/scala/kafka/api/TransactionRequest.scala | 184 +++++++++++++++++++++ .../main/scala/kafka/api/TransactionResponse.scala | 97 +++++++++++ .../kafka/api/TxCoordinatorMetadataRequest.scala | 79 +++++++++ .../kafka/api/TxCoordinatorMetadataResponse.scala | 59 +++++++ .../src/main/scala/kafka/common/ErrorMapping.scala | 1 + .../api/RequestResponseSerializationTest.scala | 31 +++- 7 files changed, 455 insertions(+), 2 deletions(-) create mode 100644 core/src/main/scala/kafka/api/TransactionRequest.scala create mode 100644 core/src/main/scala/kafka/api/TransactionResponse.scala create mode 100644 core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala create mode 100644 core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index c24c034..4a9b174 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -34,6 +34,8 @@ object RequestKeys { val ConsumerMetadataKey: Short = 10 val JoinGroupKey: Short = 11 val HeartbeatKey: Short = 12 + val TransactionKey: Short = 13 + val TxCoordinatorMetadataKey: Short = 14 val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= Map(ProduceKey -> ("Produce", ProducerRequest.readFrom), @@ -48,7 +50,9 @@ object RequestKeys { OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom), ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom), JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom), - HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom) + HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom), + TransactionKey -> ("TransactionRequest", TransactionRequest.readFrom), + TxCoordinatorMetadataKey -> ("TxCoordinatorMetadata", TxCoordinatorMetadataRequest.readFrom) ) def nameForKey(key: Short): String = { diff --git a/core/src/main/scala/kafka/api/TransactionRequest.scala b/core/src/main/scala/kafka/api/TransactionRequest.scala new file mode 100644 index 0000000..29df42c --- /dev/null +++ b/core/src/main/scala/kafka/api/TransactionRequest.scala @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio._ +import kafka.api.ApiUtils._ +import kafka.common._ +import kafka.network.RequestChannel.Response +import kafka.network.{RequestChannel, BoundedByteBufferSend} +import collection.mutable.{LinkedHashMap, LinkedHashSet} + + +object TransactionRequest { + val CurrentVersion = 0.shortValue + + def readFrom(buffer: ByteBuffer): TransactionRequest = { + val versionId: Short = buffer.getShort + val correlationId: Int = buffer.getInt + val clientId: String = readShortString(buffer) + val ackTimeoutMs: Int = buffer.getInt + val requestInfo = TransactionRequestInfo.readFrom(buffer) + + TransactionRequest(versionId, correlationId, clientId, ackTimeoutMs, requestInfo) + } + + def transactionRequestWithNewControl(oldTxRequest: TransactionRequest, newTxControl: Short): TransactionRequest = { + oldTxRequest.copy(requestInfo = oldTxRequest.requestInfo.copy(txControl = newTxControl)) + } +} + +object TxRequestTypes { + val Ongoing: Short = 0 + val Begin: Short = 1 + val PreCommit: Short = 2 + val Commit: Short = 3 + val Committed: Short = 4 + val PreAbort: Short = 5 + val Abort: Short = 6 + val Aborted: Short = 7 +} + + +case class TransactionRequest(versionId: Short = TransactionRequest.CurrentVersion, + correlationId: Int, + clientId: String, + ackTimeoutMs: Int, + requestInfo: TransactionRequestInfo) + extends RequestOrResponse(Some(RequestKeys.TransactionKey)) { + + def this(correlationId: Int, + clientId: String, + ackTimeoutMs: Int, + requestInfo: TransactionRequestInfo) = + this(TransactionRequest.CurrentVersion, correlationId, clientId, ackTimeoutMs, requestInfo) + + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + buffer.putInt(correlationId) + writeShortString(buffer, clientId) + buffer.putInt(ackTimeoutMs) + requestInfo.writeTo(buffer) + } + + def sizeInBytes: Int = { + 2 + /* versionId */ + 4 + /* correlationId */ + shortStringLength(clientId) + /* client id */ + 4 + /* ackTimeoutMs */ + requestInfo.sizeInBytes + } + + override def toString(): String = { + describe(true) + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + + val transactionResponseStatus = requestInfo.txPartitions.map { + topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + } + val errorResponse = TransactionResponse(correlationId, requestInfo.txId, transactionResponseStatus.toMap) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } + + def responseFor(status: Map[TopicAndPartition, Short]) = { + TransactionResponse(correlationId, requestInfo.txId, status); + } + + override def describe(details: Boolean): String = { + val transactionRequest = new StringBuilder + transactionRequest.append("Name: " + this.getClass.getSimpleName) + transactionRequest.append("; Version: " + versionId) + transactionRequest.append("; CorrelationId: " + correlationId) + transactionRequest.append("; ClientId: " + clientId) + transactionRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms") + if (details) + transactionRequest.append("; requestInfos: " + requestInfo) + transactionRequest.toString() + } +} + +object TransactionRequestInfo { + + def readFrom(buffer: ByteBuffer): TransactionRequestInfo = { + val txGroupId: String = readShortString(buffer) + val txId: Int = buffer.getInt + val txControl: Short = buffer.getShort + val txTimeoutMs: Int = buffer.getInt + + val topicCount = buffer.getInt + val txPartitions = (1 to topicCount).flatMap(_ => { + val topic = readShortString(buffer) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partition = buffer.getInt + TopicAndPartition(topic, partition) + }) + }).toList + + TransactionRequestInfo(txGroupId, txId, txControl, txTimeoutMs, txPartitions) + } +} + +case class TransactionRequestInfo(txGroupId: String, txId: Int, txControl: Short, txTimeoutMs: Int, + txPartitions: Seq[TopicAndPartition]) { + + private lazy val partitionsGroupedByTopic = txPartitions.groupBy(_.topic) + + def sizeInBytes: Int = { + shortStringLength(txGroupId) + /* groupId */ + 4 + /* txId */ + 2 + /* txControl */ + 4 + /* txTimeoutMs */ + 4 + /* number of topics */ + partitionsGroupedByTopic.foldLeft(0)((foldedTopics, topicAndPartitions) => { + foldedTopics + + shortStringLength(topicAndPartitions._1) + /* topic */ + 4 + /* number of partitions */ + 4 * topicAndPartitions._2.size /* partitions */ + }) + } + + def writeTo(buffer: ByteBuffer) { + writeShortString(buffer, txGroupId) + buffer.putInt(txId) + buffer.putShort(txControl) + buffer.putInt(txTimeoutMs) + buffer.putInt(partitionsGroupedByTopic.size) + partitionsGroupedByTopic.foreach { + case (topic, topicAndPartitions) => + writeShortString(buffer, topic) //write the topic + buffer.putInt(topicAndPartitions.size) //the number of partitions + topicAndPartitions.foreach {topicAndPartition: TopicAndPartition => + buffer.putInt(topicAndPartition.partition) + } + } + } + + override def toString(): String = { + val requestInfo = new StringBuilder + requestInfo.append("gId: " + txGroupId) + requestInfo.append("; txId: " + txId) + requestInfo.append("; txControl: " + txControl) + requestInfo.append("; txTimeoutMs: " + txTimeoutMs) + requestInfo.append("; TopicAndPartition: (" + txPartitions.mkString(",") + ")") + requestInfo.toString() + } + +} diff --git a/core/src/main/scala/kafka/api/TransactionResponse.scala b/core/src/main/scala/kafka/api/TransactionResponse.scala new file mode 100644 index 0000000..2a889c7 --- /dev/null +++ b/core/src/main/scala/kafka/api/TransactionResponse.scala @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.api.ApiUtils._ +import kafka.common.{TopicAndPartition, ErrorMapping} + +import scala.collection.Map + +object TransactionResponse { + def readFrom(buffer: ByteBuffer): TransactionResponse = { + + val correlationId = buffer.getInt + val txId = buffer.getInt + val topicCount = buffer.getInt + val statusPairs = (1 to topicCount).flatMap(_ => { + val topic = readShortString(buffer) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partition = buffer.getInt + val error = buffer.getShort + (TopicAndPartition(topic, partition), error) + }) + }) + TransactionResponse(correlationId, txId, Map(statusPairs:_*)) + } +} + + +case class TransactionResponse(correlationId: Int, + txId: Int, + status: Map[TopicAndPartition, Short]) + extends RequestOrResponse() { + + private lazy val statusGroupedByTopic = status.groupBy(_._1.topic) + + def hasError = status.values.exists(_ != ErrorMapping.NoError) + + val sizeInBytes = { + val groupedStatus = statusGroupedByTopic + 4 + /* correlation id */ + 4 + /* transaction id */ + 4 + /* topic count */ + groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => { + foldedTopics + + shortStringLength(currTopic._1) + + 4 + /* partition count for this topic */ + currTopic._2.size * { + 4 + /* partition id */ + 2 /* error code */ + } + }) + } + + def writeTo(buffer: ByteBuffer) { + val groupedStatus = statusGroupedByTopic + buffer.putInt(correlationId) + buffer.putInt(txId) + buffer.putInt(groupedStatus.size) // topic count + + groupedStatus.foreach(topicStatus => { + val (topic, errors) = topicStatus + writeShortString(buffer, topic) + buffer.putInt(errors.size) // partition count + errors.foreach { + case (TopicAndPartition(_, partition), error) => + buffer.putInt(partition) + buffer.putShort(error) + } + }) + } + + override def toString(): String = { + val requestInfo = new StringBuilder + requestInfo.append("txId: " + txId) + requestInfo.append("; status: " + status) + requestInfo.toString() + } + + override def describe(details: Boolean):String = { toString } +} diff --git a/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala b/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala new file mode 100644 index 0000000..a154c57 --- /dev/null +++ b/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.network.RequestChannel.Response +import kafka.common.ErrorMapping + +object TxCoordinatorMetadataRequest { + val CurrentVersion = 0.shortValue + val DefaultClientId = "" + + def readFrom(buffer: ByteBuffer) = { + // envelope + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = ApiUtils.readShortString(buffer) + + // request + val txGroupId = ApiUtils.readShortString(buffer) + TxCoordinatorMetadataRequest(txGroupId, versionId, correlationId, clientId) + } + +} + +case class TxCoordinatorMetadataRequest(txGroupId: String, + versionId: Short = TxCoordinatorMetadataRequest.CurrentVersion, + correlationId: Int = 0, + clientId: String = TxCoordinatorMetadataRequest.DefaultClientId) + extends RequestOrResponse(Some(RequestKeys.TxCoordinatorMetadataKey)) { + + def sizeInBytes = + 2 + /* versionId */ + 4 + /* correlationId */ + ApiUtils.shortStringLength(clientId) + + ApiUtils.shortStringLength(txGroupId) + + def writeTo(buffer: ByteBuffer) { + // envelope + buffer.putShort(versionId) + buffer.putInt(correlationId) + ApiUtils.writeShortString(buffer, clientId) + + // transaction coordinator metadata request + ApiUtils.writeShortString(buffer, txGroupId) + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + // return TransactionCoordinatorNotAvailable for all uncaught errors + val errorResponse = TxCoordinatorMetadataResponse(None, ErrorMapping.TxCoordinatorNotAvailableCode) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } + + def describe(details: Boolean) = { + val transactionMetadataRequest = new StringBuilder + transactionMetadataRequest.append("Name: " + this.getClass.getSimpleName) + transactionMetadataRequest.append("; Version: " + versionId) + transactionMetadataRequest.append("; CorrelationId: " + correlationId) + transactionMetadataRequest.append("; ClientId: " + clientId) + transactionMetadataRequest.append("; Group: " + txGroupId) + transactionMetadataRequest.toString() + } +} diff --git a/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala b/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala new file mode 100644 index 0000000..052b29b --- /dev/null +++ b/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.cluster.Broker +import kafka.common.ErrorMapping + +object TxCoordinatorMetadataResponse { + val CurrentVersion = 0 + + private val NoBrokerOpt = Some(Broker(id = -1, host = "", port = -1)) + + def readFrom(buffer: ByteBuffer) = { + val correlationId = buffer.getInt + val errorCode = buffer.getShort + val coordinatorOpt = if (errorCode == ErrorMapping.NoError) + Some(Broker.readFrom(buffer)) + else + None + + TxCoordinatorMetadataResponse(coordinatorOpt, errorCode, correlationId) + } + +} + +case class TxCoordinatorMetadataResponse(coordinatorOpt: Option[Broker], + errorCode: Short, + correlationId: Int = 0) + extends RequestOrResponse() { + + def sizeInBytes = + 4 + /* correlationId */ + 2 + /* error code */ + coordinatorOpt.orElse(TxCoordinatorMetadataResponse.NoBrokerOpt).get.sizeInBytes + + def writeTo(buffer: ByteBuffer) { + buffer.putInt(correlationId) + buffer.putShort(errorCode) + coordinatorOpt.orElse(TxCoordinatorMetadataResponse.NoBrokerOpt).foreach(_.writeTo(buffer)) + } + + def describe(details: Boolean) = toString +} diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 5559d26..b185421 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -46,6 +46,7 @@ object ErrorMapping { val OffsetsLoadInProgressCode: Short = 14 val ConsumerCoordinatorNotAvailableCode: Short = 15 val NotCoordinatorForConsumerCode: Short = 16 + val TxCoordinatorNotAvailableCode: Short = 17 private val exceptionToCode = Map[Class[Throwable], Short]( diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index cd16ced..e3bb7a6 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -216,6 +217,27 @@ object SerializationTestUtils { val body = new JoinGroupResponse(0.asInstanceOf[Short], 1, "consumer1", List(new TopicPartition("test11", 1))) JoinGroupResponseAndHeader(1, body) } + + def createTestTransactionRequest: TransactionRequest = { + new TransactionRequest(1, "client 1", 1000, + TransactionRequestInfo("group 1", 1, 1, 1000, Seq( + TopicAndPartition(topic1, 0), TopicAndPartition(topic2, 0) + ))) + } + + def createTestTransactionResponse: TransactionResponse = { + val responseMap = Map((TopicAndPartition(topic1, 0), ErrorMapping.NoError), + (TopicAndPartition(topic2, 0), ErrorMapping.NoError)) + TransactionResponse(1, 1,responseMap) + } + + def createTestTxCoordinatorMetadataRequest: TxCoordinatorMetadataRequest = { + TxCoordinatorMetadataRequest("txGroup 1", clientId = "client 1") + } + + def createTestTxCoordinatorMetadataResponse: TxCoordinatorMetadataResponse = { + TxCoordinatorMetadataResponse(Some(brokers.head), ErrorMapping.NoError) + } } class RequestResponseSerializationTest extends JUnitSuite { @@ -242,6 +264,10 @@ class RequestResponseSerializationTest extends JUnitSuite { private val heartbeatResponse = SerializationTestUtils.createHeartbeatResponseAndHeader private val joinGroupRequest = SerializationTestUtils.createJoinGroupRequestAndHeader private val joinGroupResponse = SerializationTestUtils.createJoinGroupResponseAndHeader + private val transactionRequest = SerializationTestUtils.createTestTransactionRequest + private val transactionResponse = SerializationTestUtils.createTestTransactionResponse + private val txCoordinatorMetadataRequest = SerializationTestUtils.createTestTxCoordinatorMetadataRequest + private val txCoordinatorMetadataResponse = SerializationTestUtils.createTestTxCoordinatorMetadataResponse @Test def testSerializationAndDeserialization() { @@ -254,7 +280,10 @@ class RequestResponseSerializationTest extends JUnitSuite { offsetCommitResponse, offsetFetchRequest, offsetFetchResponse, consumerMetadataRequest, consumerMetadataResponse, consumerMetadataResponseNoCoordinator, heartbeatRequest, - heartbeatResponse, joinGroupRequest, joinGroupResponse) + heartbeatResponse, joinGroupRequest, joinGroupResponse, + transactionRequest, transactionResponse, + txCoordinatorMetadataRequest, txCoordinatorMetadataResponse) + requestsAndResponses.foreach { original => val buffer = ByteBuffer.allocate(original.sizeInBytes) -- 1.7.12.4