From 174d5b97888cb74879a7d67756525141c5671c6b Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Fri, 8 Aug 2014 21:35:33 -0700 Subject: [PATCH] KAFKA-1523; Transaction manager and its failover handling --- core/src/main/scala/kafka/admin/TopicCommand.scala | 8 +- core/src/main/scala/kafka/cluster/Partition.scala | 7 +- core/src/main/scala/kafka/common/Topic.scala | 4 +- .../controller/ControllerChannelManager.scala | 30 +- core/src/main/scala/kafka/message/Message.scala | 82 +++-- core/src/main/scala/kafka/server/KafkaApis.scala | 259 +++++++++++++- core/src/main/scala/kafka/server/KafkaConfig.scala | 33 ++ core/src/main/scala/kafka/server/KafkaServer.scala | 25 +- .../main/scala/kafka/server/ReplicaManager.scala | 22 +- .../main/scala/kafka/server/RequestPurgatory.scala | 6 + .../scala/kafka/server/TransactionManager.scala | 385 +++++++++++++++++++++ core/src/main/scala/kafka/utils/ZkUtils.scala | 60 ++++ .../scala/unit/kafka/server/SimpleFetchTest.scala | 8 +- 13 files changed, 870 insertions(+), 59 deletions(-) create mode 100644 core/src/main/scala/kafka/server/TransactionManager.scala diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 003a09c..b94ae26 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -26,8 +26,8 @@ import scala.collection.JavaConversions._ import kafka.cluster.Broker import kafka.log.LogConfig import kafka.consumer.Whitelist -import kafka.server.OffsetManager - +import kafka.server.{TransactionManager, OffsetManager} +import kafka.common.Topic object TopicCommand { @@ -106,8 +106,8 @@ object TopicCommand { println("Updated config for topic \"%s\".".format(topic)) } if(opts.options.has(opts.partitionsOpt)) { - if (topic == OffsetManager.OffsetsTopicName) { - throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.") + if (Topic.InternalTopics.contains(topic)) { + throw new IllegalArgumentException("The number of partitions for the " + topic + " topic cannot be changed.") } println("WARNING: If partitions are increased for a topic that has a key, the partition " + "logic or ordering of the messages will be affected") diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 134aef9..9cd01e1 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -21,7 +21,7 @@ import kafka.admin.AdminUtils import kafka.utils._ import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig -import kafka.server.{OffsetManager, ReplicaManager} +import kafka.server.{TransactionManager, OffsetManager, ReplicaManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet @@ -166,7 +166,8 @@ class Partition(val topic: String, */ def makeLeader(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int, - offsetManager: OffsetManager): Boolean = { + offsetManager: OffsetManager, + newTxPartitions: mutable.Set[Int]): Boolean = { inWriteLock(leaderIsrUpdateLock) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch @@ -189,6 +190,8 @@ class Partition(val topic: String, maybeIncrementLeaderHW(getReplica().get) if (topic == OffsetManager.OffsetsTopicName) offsetManager.loadOffsetsFromLog(partitionId) + if (topic == TransactionManager.TransactionTopicName) + newTxPartitions.add(partitionId) true } } diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala index ad75978..91d9052 100644 --- a/core/src/main/scala/kafka/common/Topic.scala +++ b/core/src/main/scala/kafka/common/Topic.scala @@ -18,7 +18,7 @@ package kafka.common import util.matching.Regex -import kafka.server.OffsetManager +import kafka.server.{TransactionManager, OffsetManager} object Topic { @@ -26,7 +26,7 @@ object Topic { private val maxNameLength = 255 private val rgx = new Regex(legalChars + "+") - val InternalTopics = Set(OffsetManager.OffsetsTopicName) + val InternalTopics = Set(OffsetManager.OffsetsTopicName, TransactionManager.TransactionTopicName) def validate(topic: String) { if (topic.length <= 0) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index ecbfa0f..f5478f7 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -30,12 +30,15 @@ import kafka.common.TopicAndPartition import kafka.api.RequestOrResponse import collection.Set -class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging { +class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig, liveBrokers: Set[Broker] = null) extends Logging { private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] private val brokerLock = new Object this.logIdent = "[Channel manager on controller " + config.brokerId + "]: " - controllerContext.liveBrokers.foreach(addNewBroker(_)) + if (liveBrokers == null) + controllerContext.liveBrokers.foreach(addNewBroker(_)) + else + liveBrokers.foreach(addNewBroker(_)) def startup() = { brokerLock synchronized { @@ -89,6 +92,19 @@ class ControllerChannelManager (private val controllerContext: ControllerContext brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread)) } + def getRemainingRequestAndCallback(brokerId: Int): List[Any] = { + brokerLock synchronized { + val brokerState = brokerStateInfo(brokerId) + brokerState.requestSendThread.shutdown() + val items = brokerState.messageQueue.toArray.toList + val curItem = brokerState.requestSendThread.curQueueItem + if (curItem == null) + items + else + curItem:: items + } + } + private def removeExistingBroker(brokerId: Int) { try { brokerStateInfo(brokerId).channel.disconnect() @@ -115,12 +131,13 @@ class RequestSendThread(val controllerId: Int, extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) { private val lock = new Object() private val stateChangeLogger = KafkaController.stateChangeLogger + var curQueueItem: (RequestOrResponse, (RequestOrResponse) => Unit) = null connectToBroker(toBroker, channel) override def doWork(): Unit = { - val queueItem = queue.take() - val request = queueItem._1 - val callback = queueItem._2 + curQueueItem = queue.take() + val request = curQueueItem._1 + val callback = curQueueItem._2 var receive: Receive = null try { lock synchronized { @@ -152,6 +169,8 @@ class RequestSendThread(val controllerId: Int, response = StopReplicaResponse.readFrom(receive.buffer) case RequestKeys.UpdateMetadataKey => response = UpdateMetadataResponse.readFrom(receive.buffer) + case RequestKeys.TransactionKey => + response = TransactionResponse.readFrom(receive.buffer) } stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString)) @@ -159,6 +178,7 @@ class RequestSendThread(val controllerId: Int, if(callback != null) { callback(response) } + curQueueItem = null } } catch { case e: Throwable => diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index d2a7293..feb92ba 100644 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -33,9 +33,11 @@ object Message { val CrcLength = 4 val MagicOffset = CrcOffset + CrcLength val MagicLength = 1 + val TxIdLength = 4 val AttributesOffset = MagicOffset + MagicLength val AttributesLength = 1 - val KeySizeOffset = AttributesOffset + AttributesLength + val TxIdOffset = AttributesOffset + AttributesLength + val KeySizeOffset = TxIdOffset + TxIdLength val KeySizeLength = 4 val KeyOffset = KeySizeOffset + KeySizeLength val ValueSizeLength = 4 @@ -46,7 +48,7 @@ object Message { /** * The minimum valid size for the message header */ - val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength + val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength + TxIdLength /** * The current "magic" value @@ -60,6 +62,16 @@ object Message { val CompressionCodeMask: Int = 0x07 /** + * Specifies the mask for the transaction control. 2 bits to hold the compression codec. + * 0 is reserved to indicate no compression + */ + val TransactionControlMask: Int = 0x07 << 3 + + /** + * Specifies the offset for the transaction control within attributes. + */ + val TransactionControlOffset: Int = 3 + /** * Compression code for uncompressed messages */ val NoCompression: Int = 0 @@ -76,6 +88,7 @@ object Message { * 5. K byte key * 6. 4 byte payload length, containing length V * 7. V byte payload + * 8. 4 byte txId * * Default constructor wraps an existing ByteBuffer with the Message object with no change to the contents. */ @@ -91,27 +104,33 @@ class Message(val buffer: ByteBuffer) { * @param payloadOffset The offset into the payload array used to extract payload * @param payloadSize The size of the payload to use */ - def this(bytes: Array[Byte], - key: Array[Byte], - codec: CompressionCodec, - payloadOffset: Int, - payloadSize: Int) = { - this(ByteBuffer.allocate(Message.CrcLength + - Message.MagicLength + - Message.AttributesLength + - Message.KeySizeLength + - (if(key == null) 0 else key.length) + - Message.ValueSizeLength + - (if(bytes == null) 0 - else if(payloadSize >= 0) payloadSize - else bytes.length - payloadOffset))) + def this(bytes: Array[Byte], + key: Array[Byte], + codec: CompressionCodec, + payloadOffset: Int, + payloadSize: Int, + txId: Int, + txControl: Short) = { + this(ByteBuffer.allocate(Message.CrcLength + + Message.MagicLength + + Message.AttributesLength + + Message.KeySizeLength + + (if(key == null) 0 else key.length) + + Message.ValueSizeLength + + (if(bytes == null) 0 + else if(payloadSize >= 0) payloadSize + else bytes.length - payloadOffset) + + Message.TxIdLength)) // skip crc, we will fill that in at the end buffer.position(MagicOffset) buffer.put(CurrentMagicValue) var attributes: Byte = 0 - if (codec.codec > 0) - attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte + if (codec.codec > 0 || txControl > 0) + attributes = (attributes | (CompressionCodeMask & codec.codec) + | (TransactionControlMask & (txControl << TransactionControlOffset))).toByte + buffer.put(attributes) + buffer.putInt(txId) if(key == null) { buffer.putInt(-1) } else { @@ -130,22 +149,25 @@ class Message(val buffer: ByteBuffer) { Utils.writeUnsignedInt(buffer, CrcOffset, computeChecksum) } - def this(bytes: Array[Byte], key: Array[Byte], codec: CompressionCodec) = - this(bytes = bytes, key = key, codec = codec, payloadOffset = 0, payloadSize = -1) + def this(bytes: Array[Byte], key: Array[Byte], codec: CompressionCodec) = + this(bytes = bytes, key = key, codec = codec, payloadOffset = 0, payloadSize = -1, txId = -1, txControl = 0) def this(bytes: Array[Byte], codec: CompressionCodec) = this(bytes = bytes, key = null, codec = codec) - def this(bytes: Array[Byte], key: Array[Byte]) = + def this(bytes: Array[Byte], key: Array[Byte]) = this(bytes = bytes, key = key, codec = NoCompressionCodec) - + def this(bytes: Array[Byte]) = this(bytes = bytes, key = null, codec = NoCompressionCodec) - + + def this(bytes: Array[Byte], key: Array[Byte], txId: Int, txControl: Short) = + this(bytes = bytes, key = key, codec = NoCompressionCodec, payloadOffset = 0, payloadSize = -1, txId = txId, txControl = txControl) + /** * Compute the checksum of the message from the message contents */ - def computeChecksum(): Long = + def computeChecksum(): Long = Utils.crc32(buffer.array, buffer.arrayOffset + MagicOffset, buffer.limit - MagicOffset) /** @@ -211,7 +233,12 @@ class Message(val buffer: ByteBuffer) { */ def compressionCodec: CompressionCodec = CompressionCodec.getCompressionCodec(buffer.get(AttributesOffset) & CompressionCodeMask) - + + /** + * The transaction control used with this message + */ + def txControl: Short = ((buffer.get(AttributesOffset) & TransactionControlMask) >> TransactionControlOffset).toShort + /** * A ByteBuffer containing the content of the message */ @@ -223,6 +250,11 @@ class Message(val buffer: ByteBuffer) { def key: ByteBuffer = sliceDelimited(KeySizeOffset) /** + * A ByteBuffer containing the txId of the message + */ + def txId: Int = buffer.getInt(TxIdOffset) + + /** * Read a size-delimited byte buffer starting at the given offset */ private def sliceDelimited(start: Int): ByteBuffer = { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index fd5f12e..a86e327 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -18,6 +18,7 @@ package kafka.server import kafka.api._ +import kafka.cluster.Broker import kafka.common._ import kafka.log._ import kafka.message._ @@ -27,24 +28,27 @@ import kafka.metrics.KafkaMetricsGroup import kafka.network.RequestChannel.Response import kafka.controller.KafkaController import kafka.utils.{Pool, SystemTime, Logging} - -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic._ import scala.collection._ - import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.common.KafkaException /** + * * Logic to handle the various Kafka requests */ class KafkaApis(val requestChannel: RequestChannel, val replicaManager: ReplicaManager, val offsetManager: OffsetManager, + val transactionManager: TransactionManager, val zkClient: ZkClient, val brokerId: Int, val config: KafkaConfig, val controller: KafkaController) extends Logging { + private val transactionRequestPurgatory = + new TransactionRequestPurgatory(config.transactionPurgatoryPurgeIntervalRequests) private val producerRequestPurgatory = new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests) private val fetchRequestPurgatory = @@ -71,6 +75,8 @@ class KafkaApis(val requestChannel: RequestChannel, case RequestKeys.OffsetCommitKey => handleProducerOrOffsetCommitRequest(request) case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request) + case RequestKeys.TransactionKey => handleTransactionRequest(request) + case RequestKeys.TxCoordinatorMetadataKey => handleTxCoordinatorMetadataRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { @@ -87,7 +93,9 @@ class KafkaApis(val requestChannel: RequestChannel, // stop serving data to clients for the topic being deleted val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest] try { - val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager) + val newTxPartitions = mutable.Set.empty[Int] + val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager, newTxPartitions) + transactionManager.onTransactionManagerFailover(newTxPartitions, (txRequest: TransactionRequest) => watchAndSendRequests(txRequest)) val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse))) } catch { @@ -111,7 +119,8 @@ class KafkaApis(val requestChannel: RequestChannel, def handleUpdateMetadataRequest(request: RequestChannel.Request) { val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest] replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache) - + transactionManager.maybeResendRequests((response: RequestOrResponse) => + transactionRequestPurgatory.update(response), metadataCache) val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse))) } @@ -143,6 +152,51 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def producerRequestFromTxRequestToBroker(txRequest: TransactionRequest): ProducerRequest = { + val byteBufferMessageSet =new ByteBufferMessageSet( + config.transactionsTopicCompressionCodec, + new Message( + bytes = TransactionManager.transactionRequestValue(txRequest.requestInfo), + key = null, + txId = txRequest.requestInfo.txId, + txControl = txRequest.requestInfo.txControl)) + + val localPartitions = txRequest.requestInfo.txPartitions.filter(topicAndPartition => + replicaManager.isLeaderReplicaLocal(topicAndPartition.topic, topicAndPartition.partition)) + val producerData = mutable.Map(localPartitions.map(topicAndPartition => topicAndPartition -> byteBufferMessageSet): _*) + val request = ProducerRequest( + correlationId = txRequest.correlationId, + clientId = txRequest.clientId, + requiredAcks = -1, + ackTimeoutMs = txRequest.ackTimeoutMs, + data = producerData) + trace("Created producer request %s for transaction request %s.".format(request, txRequest)) + request + } + + private def producerRequestFromTxRequestToCoordinator(txRequest: TransactionRequest) = { + val message = new Message( + bytes = TransactionManager.transactionRequestValue(txRequest.requestInfo), + key = null, + txId = txRequest.requestInfo.txId, + txControl = txRequest.requestInfo.txControl) + + val partition = transactionManager.partitionFor(txRequest.requestInfo.txGroupId) + + val producerData = mutable.Map( + TopicAndPartition(TransactionManager.TransactionTopicName, partition) -> + new ByteBufferMessageSet(config.transactionsTopicCompressionCodec, message)) + + val request = ProducerRequest( + correlationId = txRequest.correlationId, + clientId = txRequest.clientId, + requiredAcks = -1, + ackTimeoutMs = txRequest.ackTimeoutMs, + data = producerData) + trace("Created producer request %s for transaction request %s.".format(request, txRequest)) + request + } + private def producerRequestFromOffsetCommit(offsetCommitRequest: OffsetCommitRequest) = { val msgs = offsetCommitRequest.filterLargeMetadata(config.offsetMetadataMaxSize).map { case (topicAndPartition, offset) => @@ -167,6 +221,74 @@ class KafkaApis(val requestChannel: RequestChannel, request } +/** + * 1. on receiving CoordinatorBegin message + * - append to transactionTopic + * - wait for ack from followers, and respond + * + * 2. on receiving PreCommit message + * - append to transactionTopic + * - wait for ack from followers, and respond + * - send Commit message to brokers + * - wait for ack from brokers, and append Committed to transactionTopic + * + * 3. on receiving Commit message + * - append to topicPartition + * - wait for ack from followers, and respond + * + * 4. on receiving Abort message + * - append to transactionTopic + * - wait for ack from followers, and resp + */ + + def handleTransactionRequest(request: RequestChannel.Request) { + var txRequest = request.requestObj.asInstanceOf[TransactionRequest] + val txControl = txRequest.requestInfo.txControl + if (txControl == TxRequestTypes.Begin) { + val newTxId = transactionManager.getNextTransactionId + txRequest = txRequest.copy(requestInfo = txRequest.requestInfo.copy(txId = newTxId)) + } + val producerRequest = txControl match { + case TxRequestTypes.Begin | TxRequestTypes.PreCommit | TxRequestTypes.PreAbort => + producerRequestFromTxRequestToCoordinator(txRequest) + case TxRequestTypes.Commit | TxRequestTypes.Abort => + producerRequestFromTxRequestToBroker(txRequest) + case _ => throw new KafkaException("Unhandled Transaction Control Key %d".format(txControl)) + } + val localProduceResults = appendToLocalLog(producerRequest) + + // create a list of (topic, partition) pairs to use as keys for this delayed request + val producerRequestKeys = producerRequest.data.keys.map( + topicAndPartition => new RequestKey(topicAndPartition)).toSeq + val statuses = localProduceResults.map(r => + r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap + val delayedRequest = new DelayedProduce( + producerRequestKeys, + request, + statuses, + producerRequest, + producerRequest.ackTimeoutMs.toLong, + None, + Some(txRequest)) + + producerRequestPurgatory.watch(delayedRequest) + + /* + * Replica fetch requests may have arrived (and potentially satisfied) + * delayedProduce requests while they were being added to the purgatory. + * Here, we explicitly check if any of them can be satisfied. + */ + var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] + producerRequestKeys.foreach(key => + satisfiedProduceRequests ++= + producerRequestPurgatory.update(key, key)) + debug(satisfiedProduceRequests.size + + " producer requests unblocked during produce to local log.") + satisfiedProduceRequests.foreach(_.respond()) + // we do not need the data anymore + producerRequest.emptyData() +} + /** * Handle a produce request or offset commit request (which is really a specialized producer request) */ @@ -238,7 +360,8 @@ class KafkaApis(val requestChannel: RequestChannel, statuses, produceRequest, produceRequest.ackTimeoutMs.toLong, - offsetCommitRequestOpt) + offsetCommitRequestOpt, + None) producerRequestPurgatory.watch(delayedRequest) @@ -559,7 +682,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (topics.size > 0 && topicResponses.size != topics.size) { val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet val responsesForNonExistentTopics = nonExistentTopics.map { topic => - if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) { + if (Topic.InternalTopics.contains(topic) || config.autoCreateTopicsEnable) { try { if (topic == OffsetManager.OffsetsTopicName) { AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, @@ -567,6 +690,11 @@ class KafkaApis(val requestChannel: RequestChannel, info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" .format(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor)) } + else if (topic == TransactionManager.TransactionTopicName) { + AdminUtils.createTopic(zkClient, topic, config.transactionsTopicPartitions, config.transactionsTopicReplicationFactor) + info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" + .format(topic, config.transactionsTopicPartitions, config.transactionsTopicReplicationFactor)) + } else { AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" @@ -612,6 +740,28 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } + + def handleTxCoordinatorMetadataRequest(request: RequestChannel.Request) { + val txCoordinatorMetadataRequest = request.requestObj.asInstanceOf[TxCoordinatorMetadataRequest] + val partition = transactionManager.partitionFor(txCoordinatorMetadataRequest.txGroupId) + // get metadata (and create the topic if necessary) + val transactionTopicMetadata = getTopicMetadata(Set(TransactionManager.TransactionTopicName)).head + + val errorResponse = TxCoordinatorMetadataResponse(None, ErrorMapping.TxCoordinatorNotAvailableCode, txCoordinatorMetadataRequest.correlationId) + + val response = + transactionTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata => + partitionMetadata.leader.map { leader => + TxCoordinatorMetadataResponse(Some(leader), ErrorMapping.NoError, txCoordinatorMetadataRequest.correlationId) + }.getOrElse(errorResponse) + }.getOrElse(errorResponse) + + trace("Sending transaction metadata %s for correlation id %d to client %s." + .format(response, txCoordinatorMetadataRequest.correlationId, txCoordinatorMetadataRequest.clientId)) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } + + /* * Service the consumer metadata API */ @@ -641,6 +791,7 @@ class KafkaApis(val requestChannel: RequestChannel, debug("Shutting down.") fetchRequestPurgatory.shutdown() producerRequestPurgatory.shutdown() + transactionRequestPurgatory.shutdown() debug("Shut down complete.") } @@ -705,12 +856,86 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def watchAndSendRequests(txRequest: TransactionRequest) { + val newRequestOpt = txRequest.requestInfo.txControl match { + case TxRequestTypes.PreCommit => Some(TransactionRequest.transactionRequestWithNewControl(txRequest, TxRequestTypes.Commit)) + case TxRequestTypes.PreAbort => Some(TransactionRequest.transactionRequestWithNewControl(txRequest, TxRequestTypes.Abort)) + case _ => None + } + + newRequestOpt match { + case Some(newRequest) => + val delayedCommit = new DelayedCommit(txRequest) + transactionRequestPurgatory.watch(delayedCommit) + transactionManager.sendTxRequestToLeaders(newRequest, (response: RequestOrResponse) => + transactionRequestPurgatory.update(response), metadataCache) + case None => + } + } + + /* TODO 1) move appendToLocal() to replicaManager + TODO 2) move DelayedCommit and TransactionRequestPurgatory to TransactionManager + */ + class DelayedCommit(keys: Seq[Int], + val txRequest: TransactionRequest, + delayMs: Long) + extends DelayedRequest(keys, null, delayMs) with Logging{ + + def this(txRequest: TransactionRequest) = + this(List(txRequest.requestInfo.txId), txRequest, config.transactionsAckTimeoutMs.toLong) + + val partitions = new mutable.HashSet[TopicAndPartition] + partitions ++= txRequest.requestInfo.txPartitions + + def isSatisfied(response: RequestOrResponse) = { + val txResponse = response.asInstanceOf[TransactionResponse] + partitions --= txResponse.status.filter(_._2 == ErrorMapping.NoError).keys.toSeq + partitions.isEmpty + } + + def respond() { + val txRequestToLog = txRequest.requestInfo.txControl match { + case TxRequestTypes.PreCommit => + TransactionRequest.transactionRequestWithNewControl(txRequest, TxRequestTypes.Committed) + case TxRequestTypes.PreAbort => + TransactionRequest.transactionRequestWithNewControl(txRequest, TxRequestTypes.Aborted) + } + + val producerRequest = producerRequestFromTxRequestToCoordinator(txRequestToLog) + appendToLocalLog(producerRequest) + transactionManager.updateAckedRequest(txRequest) + } + } + + private [kafka] class TransactionRequestPurgatory(purgeInterval: Int) + extends RequestPurgatory[DelayedCommit, RequestOrResponse](brokerId, purgeInterval) { + + protected def checkSatisfied(response: RequestOrResponse, delayedCommit: DelayedCommit) = { + delayedCommit.isSatisfied(response) + } + + protected def expire(delayedCommit: DelayedCommit) { + remove(List(delayedCommit.txRequest.requestInfo.txId)) + watchAndSendRequests(delayedCommit.txRequest) + } + + def update(response: RequestOrResponse) { + val txId = response.asInstanceOf[TransactionResponse].txId + val satisfiedRequests = update(txId, response) + satisfiedRequests.foreach(request => { + request.respond() + remove(List(txId)) + }) + } + } + class DelayedProduce(keys: Seq[RequestKey], request: RequestChannel.Request, val partitionStatus: immutable.Map[TopicAndPartition, DelayedProduceResponseStatus], produce: ProducerRequest, delayMs: Long, - offsetCommitRequestOpt: Option[OffsetCommitRequest] = None) + offsetCommitRequestOpt: Option[OffsetCommitRequest] = None, + transactionRequestOpt : Option[TransactionRequest] = None) extends DelayedRequest(keys, request, delayMs) with Logging { // first update the acks pending variable according to the error code @@ -736,14 +961,24 @@ class KafkaApis(val requestChannel: RequestChannel, }.map(_._2.error).getOrElse(ErrorMapping.NoError) if (errorCode == ErrorMapping.NoError) { - offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) + offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo)) + transactionRequestOpt.foreach(txRequest => + partitionStatus.foreach{ case (topicAndPartition, delayedStatus) => + transactionManager.updateUnackedRequest(txRequest, delayedStatus.requiredOffset)}) } - val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, config.offsetMetadataMaxSize)) - .getOrElse(ProducerResponse(produce.correlationId, responseStatus)) + val response: RequestOrResponse = + if (offsetCommitRequestOpt != None) offsetCommitRequestOpt.get.responseFor(errorCode, config.offsetMetadataMaxSize) + else if (transactionRequestOpt != None) transactionRequestOpt.get.responseFor(responseStatus.mapValues(_.error)) + else ProducerResponse(produce.correlationId, responseStatus) requestChannel.sendResponse(new RequestChannel.Response( request, new BoundedByteBufferSend(response))) + + // For preCommit/preAbort, forward commit/abort to partitions involved in the transaction + if (errorCode == ErrorMapping.NoError) { + if (transactionRequestOpt != None) watchAndSendRequests(transactionRequestOpt.get) + } } /** @@ -772,7 +1007,7 @@ class KafkaApis(val requestChannel: RequestChannel, (false, ErrorMapping.UnknownTopicOrPartitionCode) } if (errorCode != ErrorMapping.NoError) { - fetchPartitionStatus. acksPending = false + fetchPartitionStatus.acksPending = false fetchPartitionStatus.status.error = errorCode } else if (hasEnough) { fetchPartitionStatus.acksPending = false diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 1a45f87..39d1cd1 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -315,4 +315,37 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off */ val deleteTopicEnable = props.getBoolean("delete.topic.enable", false) + /*********** Transaction management configuration ***********/ + + /* the purge interval (in number of requests) of the transaction request purgatory */ + val transactionPurgatoryPurgeIntervalRequests = props.getInt("transaction.purgatory.purge.interval.requests", 10000) + + /** Batch size for reading from the transaction requests when re-sending requests to leaders. */ + val transactionsLoadBufferSize = props.getIntInRange("transaction.load.buffer.size", + TransactionManagerConfig.DefaultLoadBufferSize, (1, Integer.MAX_VALUE)) + + /* Timeout value for sending transaction requests to leaders involved in the transaction. + * This is similar to the producer request timeout. */ + val transactionsAckTimeoutMs = props.getIntInRange("transaction.ack.timeout.ms", + TransactionManagerConfig.DefaultTransactionAckTimeoutMs, (1, Integer.MAX_VALUE)) + + /** The number of partitions for the transaction topic (should not change after deployment). */ + val transactionsTopicPartitions: Int = props.getIntInRange("transaction.topic.num.partitions", + TransactionManagerConfig.DefaultTransactionsTopicNumPartitions, (1, Integer.MAX_VALUE)) + + /** The replication factor for the transaction topic (set higher to ensure availability). */ + val transactionsTopicReplicationFactor: Short = props.getShortInRange("transaction.topic.replication.factor", + TransactionManagerConfig.DefaultTransactionsTopicReplicationFactor, (1, Short.MaxValue)) + + /** Compression codec for the transaction topic. */ + val transactionsTopicCompressionCodec = props.getCompressionCodec("transaction.topic.compression.codec", + TransactionManagerConfig.DefaultTransactionsTopicCompressionCodec) + + /* the frequency with which we update the persistent record of the last pending txControl which acts as the transaction recovery point */ + val transactionFlushOffsetCheckpointIntervalMs = props.getIntInRange("transaction.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue)) + + /** The number of txIds allocated in each transaction id reqeust from zookeeper. + * Changing this value may cause newly allocated txId to overlap with txIds already assigned*/ + val transactionsIdBatchSize: Int = props.getIntInRange("transaction.topic.num.partitions", + TransactionManagerConfig.DefaultTransactionsIdBatchSize, (1, Integer.MAX_VALUE)) } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 2871118..5c3f3ae 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -49,6 +49,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var requestHandlerPool: KafkaRequestHandlerPool = null var logManager: LogManager = null var offsetManager: OffsetManager = null + var transactionManager: TransactionManager = null var kafkaHealthcheck: KafkaHealthcheck = null var topicConfigManager: TopicConfigManager = null var replicaManager: ReplicaManager = null @@ -101,9 +102,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg offsetManager = createOffsetManager() kafkaController = new KafkaController(config, zkClient, brokerState) + + /* start transaction manager */ + transactionManager = createTransactionManager() + + transactionManager.startup() /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) + apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, transactionManager, + zkClient, config.brokerId, config, kafkaController) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) @@ -257,6 +264,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Utils.swallow(logManager.shutdown()) if(kafkaController != null) Utils.swallow(kafkaController.shutdown()) + if(transactionManager != null) + Utils.swallow(transactionManager.shutdown()) if(zkClient != null) Utils.swallow(zkClient.close()) @@ -312,6 +321,20 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg time = time) } + private def createTransactionManager(): TransactionManager = { + val transactionConfig = TransactionManagerConfig( + transactionTopicNumPartitions = config.transactionsTopicPartitions, + transactionLoadBufferSize = config.transactionsLoadBufferSize, + transactionAckTimeoutMs = config.transactionsAckTimeoutMs, + transactionsIdBatchSize = config.transactionsIdBatchSize) + new TransactionManager(config = transactionConfig, + controller = kafkaController, + replicaManager = replicaManager, + scheduler = kafkaScheduler, + flushCheckpointMs = config.transactionFlushOffsetCheckpointIntervalMs, + zkClient = zkClient) + } + private def createOffsetManager(): OffsetManager = { val offsetManagerConfig = OffsetManagerConfig( maxMetadataSize = config.offsetMetadataMaxSize, diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 897783c..d791e71 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -180,6 +180,18 @@ class ReplicaManager(val config: KafkaConfig, throw new ReplicaNotAvailableException("Replica %d is not available for partition [%s,%d]".format(config.brokerId, topic, partition)) } + def isLeaderReplicaLocal(topic: String, partitionId: Int): Boolean = { + val partitionOpt = getPartition(topic, partitionId) + partitionOpt match { + case None => + false + case Some(partition) => + partition.leaderReplicaIfLocal match { + case Some(leaderReplica) => true + case None => false + } + } } + def getLeaderReplicaIfLocal(topic: String, partitionId: Int): Replica = { val partitionOpt = getPartition(topic, partitionId) partitionOpt match { @@ -220,7 +232,8 @@ class ReplicaManager(val config: KafkaConfig, } def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest, - offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = { + offsetManager: OffsetManager, + newTxPartitions: mutable.Set[Int]): (collection.Map[(String, Int), Short], Short) = { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]" .format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId, @@ -271,7 +284,7 @@ class ReplicaManager(val config: KafkaConfig, val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys) if (!partitionsTobeLeader.isEmpty) - makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap, offsetManager) + makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap, offsetManager, newTxPartitions) if (!partitionsToBeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap, offsetManager) @@ -301,7 +314,8 @@ class ReplicaManager(val config: KafkaConfig, private def makeLeaders(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], correlationId: Int, responseMap: mutable.Map[(String, Int), Short], - offsetManager: OffsetManager) = { + offsetManager: OffsetManager, + newTxPartitions: mutable.Set[Int]) = { partitionState.foreach(state => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-leader transition for partition %s") @@ -320,7 +334,7 @@ class ReplicaManager(val config: KafkaConfig, } // Update the partition information to be the leader partitionState.foreach{ case (partition, partitionStateInfo) => - partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)} + partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager, newTxPartitions)} } catch { case e: Throwable => diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 3d0ff1e..82aeeac 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -101,6 +101,12 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge expiredRequestReaper.enqueue(delayedRequest) } + def remove(keys: Seq[Any]) { + for (key <- keys) { + watchersForKey.remove(key) + } + } + /** * Update any watchers and return a list of newly satisfied requests. */ diff --git a/core/src/main/scala/kafka/server/TransactionManager.scala b/core/src/main/scala/kafka/server/TransactionManager.scala new file mode 100644 index 0000000..619fece --- /dev/null +++ b/core/src/main/scala/kafka/server/TransactionManager.scala @@ -0,0 +1,385 @@ +package kafka.server + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import kafka.log.FileMessageSet +import collection.JavaConversions._ +import kafka.api._ +import kafka.controller.{ControllerChannelManager, KafkaController} +import org.I0Itec.zkclient.{IZkChildListener, ZkClient} +import org.apache.kafka.common.protocol.types._ +import org.apache.kafka.common.protocol.types.Type._ +import kafka.common.{TopicAndPartition, KafkaException} +import java.nio.ByteBuffer +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import kafka.utils._ +import kafka.message.{ByteBufferMessageSet, NoCompressionCodec} + +case class TransactionManagerConfig (transactionTopicNumPartitions: Int, + transactionLoadBufferSize: Int, + transactionAckTimeoutMs: Int, + transactionsIdBatchSize: Int) + +object TransactionManagerConfig { + val DefaultTransactionsTopicNumPartitions = 1 + val DefaultTransactionsTopicCompressionCodec = NoCompressionCodec + val DefaultTransactionsTopicReplicationFactor = 1.toShort + val DefaultLoadBufferSize = 5*1024*1024 + val DefaultTransactionAckTimeoutMs = 5000 + val DefaultTransactionsIdBatchSize = 1000 +} + +case class TransactionRequestAndOffset(txRequest: TransactionRequest, nextOffset: Long) {} + +class TransactionManager(val config: TransactionManagerConfig, + val controller: KafkaController, + val replicaManager: ReplicaManager, + scheduler: Scheduler, + val flushCheckpointMs: Int, + val zkClient: ZkClient = null) extends Logging { + + var nextTxId = -1 + var txIdBatchEnd = -1 + val InitialTaskDelayMs = 30*1000 + val localClientId = replicaManager.config.brokerId.toString + + private val hasStarted = new AtomicBoolean(false) + private var channelManager: ControllerChannelManager = null + private var liveBrokerIdsUnderlying: Set[Int] = Set.empty + private val loadingPartitions = mutable.HashSet[Int]() + + // protect concurrent access to unackedRequestSets etc. + private val recoveryOffsetLock = new Object + // for each transactionTopic partition, the high watermark from which to start replay + private val recoveryOffsets = mutable.Map[Int, Long]() + // for each transactionTopic partition, the set of txRequests not fully acknowledged by brokers involved in the transaction + private val unackedRequestSets = new mutable.HashMap[Int, mutable.HashSet[TransactionRequest]] + // for each transactionTopic partition, the earliest not-yet-acknowledged txRequest and respective next offset on the log + private val unackedRequestAndOffsetQueues = new mutable.HashMap[Int, mutable.Queue[TransactionRequestAndOffset]] + + // protect concurrent access to unsentRequestQueues + private val unsentRequestsLock = new Object + // for each topicAndPartition, the ordered list of transactionRequest to send + private val unsentRequestQueues = new mutable.HashMap[TopicAndPartition, mutable.Queue[TransactionRequest]] + // for each topicAndPartition, the set of transactionRequest to send + private val unsentRequestSets = new mutable.HashMap[TopicAndPartition, mutable.HashSet[TransactionRequest]] + + def startup() { + /* Checkpoint high watermark for transaction topicPartition in zookeeper*/ + hasStarted.set(true) + } + + def shutdown() { + checkpointRecoveryOffsets() + if(channelManager != null) { + channelManager.shutdown() + channelManager = null + } + hasStarted.set(false) + } + + // TODO refactor channelManager functionality out of controller + def onTransactionManagerFailover(partitions: mutable.Set[Int], watchAndSendCallback: (TransactionRequest) => Unit) { + if (!hasStarted.get) + return + if (channelManager == null) { + val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet + channelManager = new ControllerChannelManager(controller.controllerContext, controller.config, liveBrokers) + channelManager.startup() + zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, new BrokerChangeListener()) + + if(scheduler != null) { + scheduler.schedule("kafka-recovery-point-checkpoint", + checkpointRecoveryOffsets, + delay = InitialTaskDelayMs, + period = flushCheckpointMs, + TimeUnit.MILLISECONDS) + } + } + + val offsets = ZkUtils.getTransactionRecoveryOffset(zkClient) + recoveryOffsetLock synchronized { + partitions.foreach(partition => { + recoveryOffsets.put(partition, offsets.getOrElse(partition, 0)) + unackedRequestSets.put(partition, new mutable.HashSet[TransactionRequest]()) + unackedRequestAndOffsetQueues.put(partition, new mutable.Queue[TransactionRequestAndOffset]()) + val beginOffset = recoveryOffsets(partition) + val endOffset = getHighWatermark(partition) + loadUnackedRequestsFromLog(partition, beginOffset, endOffset, watchAndSendCallback) + }) + } + } + + private def getHighWatermark(partitionId: Int): Long = { + val partitionOpt = replicaManager.getPartition(TransactionManager.TransactionTopicName, partitionId) + + val hw = partitionOpt.map { partition => + partition.leaderReplicaIfLocal().map(_.highWatermark).getOrElse(-1L) + }.getOrElse(-1L) + + hw + } + + private def loadUnackedRequestsFromLog(partition: Int, beginOffset: Long, endOffset:Long, watchAndSendCallback: (TransactionRequest) => Unit) { + val topicPartition = TopicAndPartition(TransactionManager.TransactionTopicName, partition) + + loadingPartitions synchronized { + if (loadingPartitions.contains(partition)) { + info("Unacked requests load from %s already in progress.".format(topicPartition)) + } else { + loadingPartitions.add(partition) + scheduler.schedule(topicPartition.toString, loadOffsets) + } + } + + def loadOffsets() { + info("Loading offsets from " + topicPartition) + + val startMs = SystemTime.milliseconds + try { + replicaManager.logManager.getLog(topicPartition) match { + case Some(log) => + var currOffset = beginOffset + val buffer = ByteBuffer.allocate(config.transactionLoadBufferSize) + // loop breaks if leader changes at any time during the load, since getHighWatermark is -1 + while (currOffset < endOffset && getHighWatermark(partition) != -1 && hasStarted.get()) { + buffer.clear() + val messages = log.read(currOffset, config.transactionLoadBufferSize).asInstanceOf[FileMessageSet] + messages.readInto(buffer, 0) + val messageSet = new ByteBufferMessageSet(buffer) + messageSet.foreach { msgAndOffset => + //require(msgAndOffset.message.key == null, "Transaction request entry key should be null") + require(msgAndOffset.message.payload != null, "Transaction request entry value should not be null") + val txRequestInfo = TransactionManager.readMessageValue(msgAndOffset.message.payload) + val txRequest = new TransactionRequest(0, localClientId, config.transactionAckTimeoutMs, txRequestInfo) + watchAndSendCallback(txRequest) + currOffset = msgAndOffset.nextOffset + } + } + if (hasStarted.get()) + info("Finished loading offsets from %s in %d milliseconds." + .format(topicPartition, SystemTime.milliseconds - startMs)) + case None => + warn("No log found for " + topicPartition) + } + } + catch { + case t: Throwable => + error("Error in loading offsets from " + topicPartition, t) + } + finally { + loadingPartitions synchronized loadingPartitions.remove(partition) + } + } + } + + /** + * This is the zookeeper listener that triggers all the state transitions for a replica + */ + class BrokerChangeListener() extends IZkChildListener with Logging { + def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) { + if (!hasStarted.get) + return + try { + val curBrokerIds = currentBrokerList.map(_.toInt).toSet + val newBrokerIds = curBrokerIds -- liveBrokerIdsUnderlying + val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)) + val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get) + val deadBrokerIds = liveBrokerIdsUnderlying -- curBrokerIds + val liveBrokersUnderlying = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) + liveBrokerIdsUnderlying = liveBrokersUnderlying.map(_.id) + + newBrokers.foreach(channelManager.addBroker(_)) + deadBrokerIds.foreach( broker => { + channelManager.getRemainingRequestAndCallback(broker).foreach ( item => { + val txRequest = item.asInstanceOf[(RequestOrResponse, (RequestOrResponse) => Unit)]._1.asInstanceOf[TransactionRequest] + enqueueUnsentRequests(txRequest, txRequest.requestInfo.txPartitions) + }) + channelManager.removeBroker(broker) + }) + } catch { + case e: Throwable => error("Error while handling broker changes", e) + } + } + } + + def sendTxRequestToLeaders(txRequest: TransactionRequest, callback: (RequestOrResponse) => Unit, metadataCache: MetadataCache) { + if (!hasStarted.get) + return + + val partitionAndLeader = txRequest.requestInfo.txPartitions.map(topicAndPartition => { + val partitionInfoOpt = metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition) + val leaderOpt = partitionInfoOpt.map(_.leaderIsrAndControllerEpoch.leaderAndIsr.leader) + topicAndPartition -> leaderOpt + }).toMap + + val leaders = partitionAndLeader.values.filter(_.isDefined).map(_.get).toSet + val partitionsWithoutLeader = partitionAndLeader.filterNot(_._2.isDefined).keys + + leaders.foreach(leader => + channelManager.sendRequest(leader, txRequest, callback) + ) + enqueueUnsentRequests(txRequest, partitionsWithoutLeader) + } + + private def enqueueUnsentRequests (txRequest: TransactionRequest, partitions: Iterable[TopicAndPartition]) { + unsentRequestsLock synchronized { + partitions.foreach(topicAndPartition => { + if (!unsentRequestQueues.contains(topicAndPartition)) { + unsentRequestQueues.put(topicAndPartition, new mutable.Queue[TransactionRequest]) + unsentRequestSets.put(topicAndPartition, mutable.HashSet[TransactionRequest]()) + } + unsentRequestQueues(topicAndPartition) += txRequest + unsentRequestSets(topicAndPartition) += txRequest + }) + } + } + + def maybeResendRequests(callback: (RequestOrResponse) => Unit, metadataCache: MetadataCache) { + if (!hasStarted.get) + return + unsentRequestsLock synchronized { + val topicAndPartitions = unsentRequestQueues.keys + topicAndPartitions.foreach(topicAndPartition => { + val txRequests = unsentRequestQueues.getOrElse(topicAndPartition, null) + if (txRequests != null) { + val partitionInfoOpt = metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition) + val leaderOpt = partitionInfoOpt.map(_.leaderIsrAndControllerEpoch.leaderAndIsr.leader) + if (leaderOpt.isDefined) { + txRequests.foreach(txRequest => + channelManager.sendRequest(leaderOpt.get, txRequest, callback) + ) + unsentRequestQueues.remove(topicAndPartition) + unsentRequestSets.remove(topicAndPartition) + } + } + }) + } + } + + private def checkpointRecoveryOffsets() { + recoveryOffsetLock synchronized { + val localRecoveryOffsets = recoveryOffsets.filterKeys(partition => + replicaManager.isLeaderReplicaLocal(TransactionManager.TransactionTopicName, partition) + ) + if (localRecoveryOffsets.size > 0) + ZkUtils.updateTransactionRecoveryOffset(zkClient, localRecoveryOffsets) + } + } + + def updateUnackedRequest(txRequest: TransactionRequest, nextOffset: Long) { + if (txRequest.requestInfo.txControl == TxRequestTypes.PreCommit || + txRequest.requestInfo.txControl == TxRequestTypes.PreAbort) { + val partition = partitionFor(txRequest.requestInfo.txGroupId) + recoveryOffsetLock synchronized { + unackedRequestSets(partition) += txRequest + unackedRequestAndOffsetQueues(partition) += TransactionRequestAndOffset(txRequest, nextOffset) + } + } + } + + def updateAckedRequest(txRequest: TransactionRequest) { + val partition = partitionFor(txRequest.requestInfo.txGroupId) + + recoveryOffsetLock synchronized { + unackedRequestSets(partition) -= txRequest + unackedRequestAndOffsetQueues(partition).dropWhile{ case TransactionRequestAndOffset(txRequest, nextOffset) => { + val shouldDrop = !unackedRequestSets(partition).contains(txRequest) + if (shouldDrop) recoveryOffsets(partition) = nextOffset + shouldDrop + }} + } + } + + def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % config.transactionTopicNumPartitions + + def getNextTransactionId(): Int = { + synchronized { + if (nextTxId == txIdBatchEnd) { + val newTxIdBatch = ZkUtils.getNewTransactionIdBatch(zkClient) + nextTxId = newTxIdBatch * config.transactionsIdBatchSize + txIdBatchEnd = (newTxIdBatch + 1) * config.transactionsIdBatchSize + } + nextTxId += 1 + nextTxId - 1 + } + } +} + +object TransactionManager { + + val TransactionTopicName = "__transaction_control" + + private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema) + + private val CURRENT_TRANSACTION_SCHEMA_VERSION = 0.toShort + + private val TRANSACTION_KEY_SCHEMA_V0 = new Schema() + + private val TRANSACTION_VALUE_SCHEMA_V0 = new Schema(new Field("groupid", STRING), + new Field("txid", INT32), + new Field("txcontrol", INT16), + new Field("txtimeout", INT32), + new Field("txpartitions", + new ArrayOf(new Schema( + new Field("topic", STRING), + new Field("partition", INT32))))) + + // map of versions to schemas + private val TRANSACTION_SCHEMAS = Map(0 -> KeyAndValueSchemas(TRANSACTION_KEY_SCHEMA_V0, TRANSACTION_VALUE_SCHEMA_V0)) + + private val CURRENT_SCHEMA = schemaFor(CURRENT_TRANSACTION_SCHEMA_VERSION) + + private def schemaFor(version: Int) = { + val schemaOpt = TRANSACTION_SCHEMAS.get(version) + schemaOpt match { + case Some(schema) => schema + case _ => throw new KafkaException("Unknown transaction schema version " + version) + } + } + + def transactionRequestValue(txRequestInfo: TransactionRequestInfo): Array[Byte] = { + val value = new Struct(CURRENT_SCHEMA.valueSchema) + value.set("groupid", txRequestInfo.txGroupId) + value.set("txid", txRequestInfo.txId) + value.set("txcontrol", txRequestInfo.txControl) + value.set("txtimeout", txRequestInfo.txTimeoutMs) + + val topicAndPartitions = ArrayBuffer[Struct]() + for (tp <- txRequestInfo.txPartitions) { + val topicAndPartition = value.instance("txpartitions"); + topicAndPartition.set("topic", tp.topic) + topicAndPartition.set("partition", tp.partition) + topicAndPartitions += topicAndPartition + } + value.set("txpartitions", topicAndPartitions.toArray) + + val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) + byteBuffer.putShort(CURRENT_TRANSACTION_SCHEMA_VERSION) + value.writeTo(byteBuffer) + byteBuffer.array() + } + + def readMessageValue(buffer: ByteBuffer): TransactionRequestInfo = { + val version = buffer.getShort() + val valueSchema = schemaFor(version).valueSchema + val value = valueSchema.read(buffer).asInstanceOf[Struct] + + val txGroupId = value.get("groupid").asInstanceOf[String] + val txId = value.get("txid").asInstanceOf[Int] + val txControl = value.get("txcontrol").asInstanceOf[Short] + val txTimeoutMs = value.get("txtimeout").asInstanceOf[Int] + + val txPartitions = value.getArray("txpartitions").map { + subValue => { + val topicAndPartition = subValue.asInstanceOf[Struct] + val topic = topicAndPartition.get("topic").asInstanceOf[String] + val partition = topicAndPartition.get("partition").asInstanceOf[Int] + TopicAndPartition(topic, partition) + } + } + TransactionRequestInfo(txGroupId, txId, txControl, txTimeoutMs, txPartitions) + } + +} diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index dcdc1ce..2d1fd5f 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -46,6 +46,8 @@ object ZkUtils extends Logging { val ReassignPartitionsPath = "/admin/reassign_partitions" val DeleteTopicsPath = "/admin/delete_topics" val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" + val transactionIdPath = "/transaction/ids" + val transactionRecoveryPath = "/transaction/recovery_offsets" def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic @@ -61,6 +63,9 @@ object ZkUtils extends Logging { def getDeleteTopicPath(topic: String): String = DeleteTopicsPath + "/" + topic + def getTransactionPartitionRecoveryPath(partition: Int): String = + transactionRecoveryPath + "/" + partition + def getController(zkClient: ZkClient): Int = { readDataMaybeNull(zkClient, ControllerPath)._1 match { case Some(controller) => KafkaController.parseControllerId(controller) @@ -617,6 +622,41 @@ object ZkUtils extends Logging { "replicas" -> e._2)))) } + def getTransactionRecoveryOffset(zkClient: ZkClient): Map[Int, Long] = { + val partitions = try { + getChildren(zkClient, transactionRecoveryPath) + } catch { + case nne: ZkNoNodeException => + zkClient.createPersistent(transactionRecoveryPath, true) + debug("Created path %s for transaction partition recovery".format(transactionRecoveryPath)) + Seq.empty[String] + case e2: Throwable => throw new AdminOperationException(e2.toString) + } + + val recoveryPoints = partitions.map { partition => { + val zkPath = getTransactionPartitionRecoveryPath(partition.toInt) + val data = readData(zkClient, zkPath)._1 + (partition.toInt, data.toLong) + }}.toMap + recoveryPoints + } + + def updateTransactionRecoveryOffset(zkClient: ZkClient, recoveryPoints: Map[Int, Long]) { + recoveryPoints.foreach{ case (partition: Int, recoveryPoint: Long) => { + val zkPath = getTransactionPartitionRecoveryPath(partition) + val data = recoveryPoint.toString + try { + updatePersistentPath(zkClient, zkPath, data) + info("Updated transaction partition %s with recovery offset %s".format(partition, recoveryPoint)) + } catch { + case nne: ZkNoNodeException => + ZkUtils.createPersistentPath(zkClient, zkPath, data) + debug("Created path %s with %s for transaction partition recovery".format(zkPath, recoveryPoint)) + case e2: Throwable => throw new AdminOperationException(e2.toString) + } + }} + } + def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) { val zkPath = ZkUtils.ReassignPartitionsPath partitionsToBeReassigned.size match { @@ -708,6 +748,26 @@ object ZkUtils extends Logging { }.flatten.toSet } } + + def getNewTransactionIdBatch(zkClient: ZkClient): Int = { + try { + val stat = zkClient.writeDataReturnStat(transactionIdPath, "", -1) + stat.getVersion + } catch { + case e: ZkNoNodeException => { + createParentPath(zkClient, transactionIdPath) + try { + zkClient.createPersistent(transactionIdPath, "") + } catch { + case e: ZkNodeExistsException => + case e2: Throwable => throw e2 + } + val stat = zkClient.writeDataReturnStat(transactionIdPath, "", -1) + stat.getVersion + } + case e2: Throwable => throw e2 + } + } } object ZKStringSerializer extends ZkSerializer { diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index b1c4ce9..a85109c 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -87,13 +87,13 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.replay(replicaManager) val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager]) - + val transactionManager = EasyMock.createMock(classOf[kafka.server.TransactionManager]) val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController]) // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary) // don't provide replica or leader callbacks since they will not be tested here val requestChannel = new RequestChannel(2, 5) - val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) + val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, transactionManager, zkClient, configs.head.brokerId, configs.head, controller) val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo) @@ -163,11 +163,11 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.replay(replicaManager) val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager]) - + val transactionManager = EasyMock.createMock(classOf[kafka.server.TransactionManager]) val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController]) val requestChannel = new RequestChannel(2, 5) - val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) + val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, transactionManager, zkClient, configs.head.brokerId, configs.head, controller) val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo) EasyMock.replay(partitionStateInfo) -- 1.7.12.4