From b8ddfe0d5605758ada30c563ca21f29d5eba40d8 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Tue, 5 Aug 2014 13:37:53 -0700 Subject: [PATCH] KAFKA-1565 Transaction manager failover handling --- .../main/scala/kafka/api/TransactionRequest.scala | 2 +- .../main/scala/kafka/api/TransactionResponse.scala | 7 + core/src/main/scala/kafka/cluster/Partition.scala | 7 +- .../controller/ControllerChannelManager.scala | 28 +- core/src/main/scala/kafka/server/KafkaApis.scala | 150 +++++----- core/src/main/scala/kafka/server/KafkaConfig.scala | 14 +- core/src/main/scala/kafka/server/KafkaServer.scala | 5 +- .../main/scala/kafka/server/ReplicaManager.scala | 22 +- .../main/scala/kafka/server/RequestPurgatory.scala | 6 + .../scala/kafka/server/TransactionManager.scala | 317 ++++++++++++++++----- core/src/main/scala/kafka/utils/ZkUtils.scala | 4 +- 11 files changed, 394 insertions(+), 168 deletions(-) diff --git a/core/src/main/scala/kafka/api/TransactionRequest.scala b/core/src/main/scala/kafka/api/TransactionRequest.scala index c315bc3..8015ec9 100644 --- a/core/src/main/scala/kafka/api/TransactionRequest.scala +++ b/core/src/main/scala/kafka/api/TransactionRequest.scala @@ -43,7 +43,7 @@ object TransactionRequest { } } -object TxControlTypes { +object TxRequestTypes { val Ongoing: Short = 0 val Begin: Short = 1 val PreCommit: Short = 2 diff --git a/core/src/main/scala/kafka/api/TransactionResponse.scala b/core/src/main/scala/kafka/api/TransactionResponse.scala index 716c013..8a84dfd 100644 --- a/core/src/main/scala/kafka/api/TransactionResponse.scala +++ b/core/src/main/scala/kafka/api/TransactionResponse.scala @@ -86,5 +86,12 @@ case class TransactionResponse(override val correlationId: Int, }) } + override def toString(): String = { + val requestInfo = new StringBuilder + requestInfo.append("txId: " + txId) + requestInfo.append("; status: " + status) + requestInfo.toString() + } + override def describe(details: Boolean):String = { toString } } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index f2ca856..e9b1373 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,7 +22,7 @@ import kafka.utils.{ReplicationUtils, Pool, Time, Logging} import kafka.utils.Utils.inLock import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig -import kafka.server.{OffsetManager, ReplicaManager} +import kafka.server.{TransactionManager, OffsetManager, ReplicaManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet @@ -167,7 +167,8 @@ class Partition(val topic: String, */ def makeLeader(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int, - offsetManager: OffsetManager): Boolean = { + offsetManager: OffsetManager, + newTxPartitions: mutable.Set[Int]): Boolean = { inLock(leaderIsrUpdateLock.writeLock()) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch @@ -190,6 +191,8 @@ class Partition(val topic: String, maybeIncrementLeaderHW(getReplica().get) if (topic == OffsetManager.OffsetsTopicName) offsetManager.loadOffsetsFromLog(partitionId) + if (topic == TransactionManager.TransactionTopicName) + newTxPartitions.add(partitionId) true } } diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index f578961..bf31032 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -30,12 +30,15 @@ import kafka.common.TopicAndPartition import kafka.api.RequestOrResponse import collection.Set -class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging { +class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig, liveBrokers: Set[Broker] = null) extends Logging { private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] private val brokerLock = new Object this.logIdent = "[Channel manager on controller " + config.brokerId + "]: " - controllerContext.liveBrokers.foreach(addNewBroker(_)) + if (liveBrokers == null) + controllerContext.liveBrokers.foreach(addNewBroker(_)) + else + liveBrokers.foreach(addNewBroker(_)) def startup() = { brokerLock synchronized { @@ -89,6 +92,19 @@ class ControllerChannelManager (private val controllerContext: ControllerContext brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread)) } + def getRemainingRequestAndCallback(brokerId: Int): List[Any] = { + brokerLock synchronized { + val brokerState = brokerStateInfo(brokerId) + brokerState.requestSendThread.shutdown() + val items = brokerState.messageQueue.toArray.toList + val curItem = brokerState.requestSendThread.curQueueItem + if (curItem == null) + items + else + curItem:: items + } + } + private def removeExistingBroker(brokerId: Int) { try { brokerStateInfo(brokerId).channel.disconnect() @@ -115,12 +131,13 @@ class RequestSendThread(val controllerId: Int, extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) { private val lock = new Object() private val stateChangeLogger = KafkaController.stateChangeLogger + var curQueueItem: (RequestOrResponse, (RequestOrResponse) => Unit) = null connectToBroker(toBroker, channel) override def doWork(): Unit = { - val queueItem = queue.take() - val request = queueItem._1 - val callback = queueItem._2 + curQueueItem = queue.take() + val request = curQueueItem._1 + val callback = curQueueItem._2 var receive: Receive = null try { lock synchronized { @@ -161,6 +178,7 @@ class RequestSendThread(val controllerId: Int, if(callback != null) { callback(response) } + curQueueItem = null } } catch { case e: Throwable => diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c0f5177..7ea5d06 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -47,7 +47,8 @@ class KafkaApis(val requestChannel: RequestChannel, val config: KafkaConfig, val controller: KafkaController) extends Logging { - private val commitRequestPurgatory = new CommitRequestPurgatory + private val transactionRequestPurgatory = + new TransactionRequestPurgatory(config.transactionPurgatoryPurgeIntervalRequests) private val producerRequestPurgatory = new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests) private val fetchRequestPurgatory = @@ -92,7 +93,9 @@ class KafkaApis(val requestChannel: RequestChannel, // stop serving data to clients for the topic being deleted val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest] try { - val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager) + val newTxPartitions = mutable.Set.empty[Int] + val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager, newTxPartitions) + transactionManager.onTransactionManagerFailover(newTxPartitions, (txRequest: TransactionRequest) => watchAndSendRequests(txRequest)) val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse))) } catch { @@ -116,7 +119,8 @@ class KafkaApis(val requestChannel: RequestChannel, def handleUpdateMetadataRequest(request: RequestChannel.Request) { val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest] replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache) - + transactionManager.maybeResendRequests((response: RequestOrResponse) => + transactionRequestPurgatory.update(response), metadataCache) val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse))) } @@ -152,21 +156,14 @@ class KafkaApis(val requestChannel: RequestChannel, val byteBufferMessageSet =new ByteBufferMessageSet( config.transactionsTopicCompressionCodec, new Message( - bytes = TransactionManager.transactionControlValue(txRequest.requestInfo), - key = Array.empty[Byte], + bytes = TransactionManager.transactionRequestValue(txRequest.requestInfo), + key = null, txId = txRequest.requestInfo.txId, txControl = txRequest.requestInfo.txControl)) - val localPartitions = txRequest.requestInfo.txPartitions.filter(topicAndPartition => { - val partitionInfoOpt = metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition) - val localBrokerId = replicaManager.config.brokerId - partitionInfoOpt match { - case Some(partitionInfo) => partitionInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == localBrokerId - case None => false - }}) - + val localPartitions = txRequest.requestInfo.txPartitions.filter(topicAndPartition => + replicaManager.isLeaderReplicaLocal(topicAndPartition.topic, topicAndPartition.partition)) val producerData = mutable.Map(localPartitions.map(topicAndPartition => topicAndPartition -> byteBufferMessageSet): _*) - val request = ProducerRequest( correlationId = txRequest.correlationId, clientId = txRequest.clientId, @@ -179,8 +176,8 @@ class KafkaApis(val requestChannel: RequestChannel, private def producerRequestFromTxRequestToCoordinator(txRequest: TransactionRequest) = { val message = new Message( - bytes = TransactionManager.transactionControlValue(txRequest.requestInfo), - key = Array.empty[Byte], + bytes = TransactionManager.transactionRequestValue(txRequest.requestInfo), + key = null, txId = txRequest.requestInfo.txId, txControl = txRequest.requestInfo.txControl) @@ -224,15 +221,6 @@ class KafkaApis(val requestChannel: RequestChannel, request } - def sendTxRequestToBrokers(txRequest: TransactionRequest) { - val partitionInfoOpts = txRequest.requestInfo.txPartitions.map ( topicAndPartition => - metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition)) - val brokers = partitionInfoOpts.filter(_ != None).map(_.get.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet - brokers.foreach(broker => - transactionManager.sendRequest(broker, txRequest, (response: RequestOrResponse) => commitRequestPurgatory.update(response)) - ) - } - /** * 1. on receiving CoordinatorBegin message * - append to transactionTopic @@ -256,16 +244,16 @@ class KafkaApis(val requestChannel: RequestChannel, def handleTransactionRequest(request: RequestChannel.Request) { var txRequest = request.requestObj.asInstanceOf[TransactionRequest] val txControl = txRequest.requestInfo.txControl - if (txControl == TxControlTypes.Begin) { + if (txControl == TxRequestTypes.Begin) { val newTxId = transactionManager.getNextTransactionId txRequest = txRequest.copy(requestInfo = txRequest.requestInfo.copy(txId = newTxId)) } val producerRequest = txControl match { - case TxControlTypes.Begin | TxControlTypes.PreCommit | TxControlTypes.PreAbort => + case TxRequestTypes.Begin | TxRequestTypes.PreCommit | TxRequestTypes.PreAbort => producerRequestFromTxRequestToCoordinator(txRequest) - case TxControlTypes.Commit | TxControlTypes.Abort => + case TxRequestTypes.Commit | TxRequestTypes.Abort => producerRequestFromTxRequestToBroker(txRequest) - case _ => throw new KafkaException("Unhandled Transaction Control Key %i".format(txControl)) + case _ => throw new KafkaException("Unhandled Transaction Control Key %d".format(txControl)) } val localProduceResults = appendToLocalLog(producerRequest) @@ -802,6 +790,7 @@ class KafkaApis(val requestChannel: RequestChannel, debug("Shutting down.") fetchRequestPurgatory.shutdown() producerRequestPurgatory.shutdown() + transactionRequestPurgatory.shutdown() debug("Shut down complete.") } @@ -866,53 +855,76 @@ class KafkaApis(val requestChannel: RequestChannel, } } - class DelayedCommit(val txRequest: TransactionRequest) { - val satisfied = new AtomicBoolean(false) + def watchAndSendRequests(txRequest: TransactionRequest) { + val newRequestOpt = txRequest.requestInfo.txControl match { + case TxRequestTypes.PreCommit => Some(TransactionRequest.TransactionWithNewControl(txRequest, TxRequestTypes.Commit)) + case TxRequestTypes.PreAbort => Some(TransactionRequest.TransactionWithNewControl(txRequest, TxRequestTypes.Abort)) + case _ => None + } + + newRequestOpt match { + case Some(newRequest) => + val delayedCommit = new DelayedCommit(txRequest) + transactionRequestPurgatory.watch(delayedCommit) + transactionManager.sendTxRequestToLeaders(newRequest, (response: RequestOrResponse) => + transactionRequestPurgatory.update(response), metadataCache) + case None => + } + } + + /* TODO 1) move appendToLocal() to replicaManager + TODO 2) move DelayedCommit and TransactionRequestPurgatory to TransactionManager + */ + class DelayedCommit(keys: Seq[Int], + val txRequest: TransactionRequest, + delayMs: Long) + extends DelayedRequest(keys, null, delayMs) with Logging{ + + def this(txRequest: TransactionRequest) = + this(List(txRequest.requestInfo.txId), txRequest, config.transactionsAckTimeoutMs.toLong) + val partitions = new mutable.HashSet[TopicAndPartition] partitions ++= txRequest.requestInfo.txPartitions - def checkSatisfied(ackedPartitions: Seq[TopicAndPartition]) = { - this synchronized { - partitions --= ackedPartitions - partitions.isEmpty - } + def isSatisfied(response: RequestOrResponse) = { + val txResponse = response.asInstanceOf[TransactionResponse] + partitions --= txResponse.status.filter(_._2 == ErrorMapping.NoError).keys.toSeq + partitions.isEmpty } def respond() { val txRequestToLog = txRequest.requestInfo.txControl match { - case TxControlTypes.Commit => - TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.Committed) - case TxControlTypes.Abort => - TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.Aborted) + case TxRequestTypes.PreCommit => + TransactionRequest.TransactionWithNewControl(txRequest, TxRequestTypes.Committed) + case TxRequestTypes.PreAbort => + TransactionRequest.TransactionWithNewControl(txRequest, TxRequestTypes.Aborted) } val producerRequest = producerRequestFromTxRequestToCoordinator(txRequestToLog) appendToLocalLog(producerRequest) - transactionManager.checkpointTransactionHW(txRequest) + transactionManager.updateAckedRequest(txRequest) } } - class CommitRequestPurgatory() { - private val commitRequestMap = new ConcurrentHashMap[Int, DelayedCommit]() + private [kafka] class TransactionRequestPurgatory(purgeInterval: Int) + extends RequestPurgatory[DelayedCommit, RequestOrResponse](brokerId, purgeInterval) { - def watch(txId: Int, delayed: DelayedCommit) { - assert(!commitRequestMap.contains(txId)) - commitRequestMap.put(txId, delayed) + protected def checkSatisfied(response: RequestOrResponse, delayedCommit: DelayedCommit) = { + delayedCommit.isSatisfied(response) } - def update(response: RequestOrResponse) { - val txResponse = response.asInstanceOf[TransactionResponse] - val partitions = txResponse.status.filter(_._2 == ErrorMapping.NoError).keys.toSeq - val delayedCommit = commitRequestMap.get(txResponse.txId) + protected def expire(delayedCommit: DelayedCommit) { + remove(List(delayedCommit.txRequest.requestInfo.txId)) + watchAndSendRequests(delayedCommit.txRequest) + } - if (delayedCommit != null) { - if (delayedCommit.checkSatisfied(partitions)) { - if (delayedCommit.satisfied.compareAndSet(false, true)) - delayedCommit.respond() - } - } else { - info("txId %i not found in commitRequestMap".format(txResponse.txId)) - } + def update(response: RequestOrResponse) { + val txId = response.asInstanceOf[TransactionResponse].txId + val satisfiedRequests = update(txId, response) + satisfiedRequests.foreach(request => { + request.respond() + remove(List(txId)) + }) } } @@ -938,12 +950,6 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus)) } - def watchAndSendRequests(txRequest: TransactionRequest) { - val delayedCommit = new DelayedCommit(txRequest) - commitRequestPurgatory.watch(txRequest.requestInfo.txId, delayedCommit) - sendTxRequestToBrokers(txRequest) - } - def respond() { val responseStatus = partitionStatus.map { case (topicAndPartition, delayedStatus) => topicAndPartition -> delayedStatus.status @@ -957,7 +963,7 @@ class KafkaApis(val requestChannel: RequestChannel, offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo)) transactionRequestOpt.foreach(txRequest => partitionStatus.foreach{ case (topicAndPartition, delayedStatus) => - transactionManager.recordPendingRequest(txRequest, delayedStatus.requiredOffset)}) + transactionManager.updateUnackedRequest(txRequest, delayedStatus.requiredOffset)}) } val response: RequestOrResponse = @@ -970,16 +976,8 @@ class KafkaApis(val requestChannel: RequestChannel, // For preCommit/preAbort, forward commit/abort to partitions involved in the transaction if (errorCode == ErrorMapping.NoError) { - transactionRequestOpt.foreach(txRequest => txRequest.requestInfo.txControl match { - case TxControlTypes.PreCommit => - val newTxRequest = TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.Commit) - watchAndSendRequests(newTxRequest) - case TxControlTypes.PreAbort => - val newTxRequest = TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.Abort) - watchAndSendRequests(newTxRequest) - case _ => - } - )} + if (transactionRequestOpt != None) watchAndSendRequests(transactionRequestOpt.get) + } } /** @@ -1008,7 +1006,7 @@ class KafkaApis(val requestChannel: RequestChannel, (false, ErrorMapping.UnknownTopicOrPartitionCode) } if (errorCode != ErrorMapping.NoError) { - fetchPartitionStatus. acksPending = false + fetchPartitionStatus.acksPending = false fetchPartitionStatus.status.error = errorCode } else if (hasEnough) { fetchPartitionStatus.acksPending = false diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 1e32707..d3b0443 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -293,7 +293,19 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /*********** Transaction management configuration ***********/ - /** The number of partitions for the offset commit topic (should not change after deployment). */ + /* the purge interval (in number of requests) of the transaction request purgatory */ + val transactionPurgatoryPurgeIntervalRequests = props.getInt("transaction.purgatory.purge.interval.requests", 10000) + + /** Batch size for reading from the transaction requests when re-sending requests to leaders. */ + val transactionsLoadBufferSize = props.getIntInRange("transaction.load.buffer.size", + TransactionManagerConfig.DefaultLoadBufferSize, (1, Integer.MAX_VALUE)) + + /* Timeout value for sending transaction requests to leaders involved in the transaction. + * This is similar to the producer request timeout. */ + val transactionsAckTimeoutMs = props.getIntInRange("transaction.ack.timeout.ms", + TransactionManagerConfig.DefaultTransactionAckTimeoutMs, (1, Integer.MAX_VALUE)) + + /** The number of partitions for the transaction topic (should not change after deployment). */ val transactionsTopicPartitions: Int = props.getIntInRange("transaction.topic.num.partitions", TransactionManagerConfig.DefaultTransactionsTopicNumPartitions, (1, Integer.MAX_VALUE)) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 3c5022d..9501cbb 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -321,9 +321,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg private def createTransactionManager(): TransactionManager = { val transactionConfig = TransactionManagerConfig( - transactionsTopicNumPartitions = config.transactionsTopicPartitions) + transactionTopicNumPartitions = config.transactionsTopicPartitions, + transactionLoadBufferSize = config.transactionsLoadBufferSize, + transactionAckTimeoutMs = config.transactionsAckTimeoutMs) new TransactionManager(config = transactionConfig, controller = kafkaController, + replicaManager = replicaManager, scheduler = kafkaScheduler, flushCheckpointMs = config.transactionFlushOffsetCheckpointIntervalMs, zkClient = zkClient) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6a56a77..7533dfd 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -189,6 +189,18 @@ class ReplicaManager(val config: KafkaConfig, throw new ReplicaNotAvailableException("Replica %d is not available for partition [%s,%d]".format(config.brokerId, topic, partition)) } + def isLeaderReplicaLocal(topic: String, partitionId: Int): Boolean = { + val partitionOpt = getPartition(topic, partitionId) + partitionOpt match { + case None => + false + case Some(partition) => + partition.leaderReplicaIfLocal match { + case Some(leaderReplica) => true + case None => false + } + } } + def getLeaderReplicaIfLocal(topic: String, partitionId: Int): Replica = { val partitionOpt = getPartition(topic, partitionId) partitionOpt match { @@ -229,7 +241,8 @@ class ReplicaManager(val config: KafkaConfig, } def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest, - offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = { + offsetManager: OffsetManager, + newTxPartitions: mutable.Set[Int]): (collection.Map[(String, Int), Short], Short) = { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]" .format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId, @@ -280,7 +293,7 @@ class ReplicaManager(val config: KafkaConfig, val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys) if (!partitionsTobeLeader.isEmpty) - makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap, offsetManager) + makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap, offsetManager, newTxPartitions) if (!partitionsToBeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap, offsetManager) @@ -310,7 +323,8 @@ class ReplicaManager(val config: KafkaConfig, private def makeLeaders(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], correlationId: Int, responseMap: mutable.Map[(String, Int), Short], - offsetManager: OffsetManager) = { + offsetManager: OffsetManager, + newTxPartitions: mutable.Set[Int]) = { partitionState.foreach(state => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-leader transition for partition %s") @@ -329,7 +343,7 @@ class ReplicaManager(val config: KafkaConfig, } // Update the partition information to be the leader partitionState.foreach{ case (partition, partitionStateInfo) => - partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)} + partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager, newTxPartitions)} // Finally add these partitions to the list of partitions for which the leader is the current broker leaderPartitionsLock synchronized { diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 3d0ff1e..82aeeac 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -101,6 +101,12 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge expiredRequestReaper.enqueue(delayedRequest) } + def remove(keys: Seq[Any]) { + for (key <- keys) { + watchersForKey.remove(key) + } + } + /** * Update any watchers and return a list of newly satisfied requests. */ diff --git a/core/src/main/scala/kafka/server/TransactionManager.scala b/core/src/main/scala/kafka/server/TransactionManager.scala index 12d7c74..2112e92 100644 --- a/core/src/main/scala/kafka/server/TransactionManager.scala +++ b/core/src/main/scala/kafka/server/TransactionManager.scala @@ -2,80 +2,175 @@ package kafka.server import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.locks.ReentrantLock +import kafka.log.FileMessageSet import collection.JavaConversions._ -import kafka.utils.Utils._ import kafka.api._ import kafka.controller.{ControllerChannelManager, KafkaController} import org.I0Itec.zkclient.{IZkChildListener, ZkClient} import org.apache.kafka.common.protocol.types._ import org.apache.kafka.common.protocol.types.Type._ -import kafka.common.KafkaException +import kafka.common.{TopicAndPartition, KafkaException} import java.nio.ByteBuffer import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import kafka.utils.{Logging, Scheduler, ZkUtils, Utils} -import kafka.message.NoCompressionCodec +import kafka.utils._ +import kafka.message.{ByteBufferMessageSet, NoCompressionCodec} -case class TransactionManagerConfig (transactionsTopicNumPartitions: Int) +case class TransactionManagerConfig (transactionTopicNumPartitions: Int, + transactionLoadBufferSize: Int, + transactionAckTimeoutMs: Int) object TransactionManagerConfig { val DefaultTransactionsTopicNumPartitions = 1 val DefaultTransactionsTopicCompressionCodec = NoCompressionCodec val DefaultTransactionsIdBatchSize = 1000 val DefaultTransactionsTopicReplicationFactor = 1.toShort + val DefaultLoadBufferSize = 5*1024*1024 + val DefaultTransactionAckTimeoutMs = 5000 } case class TransactionRequestAndOffset(txRequest: TransactionRequest, nextOffset: Long) {} class TransactionManager(val config: TransactionManagerConfig, val controller: KafkaController, + val replicaManager: ReplicaManager, scheduler: Scheduler, val flushCheckpointMs: Int, - val zkClient: ZkClient = null) { + val zkClient: ZkClient = null) extends Logging { var nextTxId = -1 var txIdBatchEnd = -1 val InitialTaskDelayMs = 30*1000 + val localClientId = replicaManager.config.brokerId.toString - private var channelManager: ControllerChannelManager = null private val hasStarted = new AtomicBoolean(false) - private val channelLock: ReentrantLock = new ReentrantLock() + private var channelManager: ControllerChannelManager = null private var liveBrokerIdsUnderlying: Set[Int] = Set.empty - - - private val lock = new Object - private val recoveryPoints = mutable.Map[Int, Long]() - // The set of txRequests which have not been acknowledged by brokers involved in the transaction - private val requestSets = new mutable.HashMap[Int, mutable.HashSet[TransactionRequest]] - // This queue maintains the earliest not-yet-acknowledged txRequest and its next offset on the log - private val requestAndOffsetQueues = new mutable.HashMap[Int, mutable.Queue[TransactionRequestAndOffset]] + private val loadingPartitions = mutable.HashSet[Int]() + + // protect concurrent access to unackedRequestSets etc. + private val recoveryOffsetLock = new Object + // for each transactionTopic partition, the high watermark from which to start replay + private val recoveryOffsets = mutable.Map[Int, Long]() + // for each transactionTopic partition, the set of txRequests not fully acknowledged by brokers involved in the transaction + private val unackedRequestSets = new mutable.HashMap[Int, mutable.HashSet[TransactionRequest]] + // for each transactionTopic partition, the earliest not-yet-acknowledged txRequest and respective next offset on the log + private val unackedRequestAndOffsetQueues = new mutable.HashMap[Int, mutable.Queue[TransactionRequestAndOffset]] + + // protect concurrent access to unsentRequestQueues + private val unsentRequestsLock = new Object + // for each topicAndPartition, the ordered list of transactionRequest to send + private val unsentRequestQueues = new mutable.HashMap[TopicAndPartition, mutable.Queue[TransactionRequest]] + // for each topicAndPartition, the set of transactionRequest to send + private val unsentRequestSets = new mutable.HashMap[TopicAndPartition, mutable.HashSet[TransactionRequest]] def startup() { /* Checkpoint high watermark for transaction topicPartition in zookeeper*/ - if(scheduler != null) { - scheduler.schedule("kafka-recovery-point-checkpoint", - checkpointRecoveryPointOffsets, - delay = InitialTaskDelayMs, - period = flushCheckpointMs, - TimeUnit.MILLISECONDS) - } - channelManager = new ControllerChannelManager(controller.controllerContext, controller.config) - channelManager.startup() - registerBrokerChangeListener() hasStarted.set(true) } def shutdown() { - hasStarted.set(false) + checkpointRecoveryOffsets() if(channelManager != null) { channelManager.shutdown() channelManager = null } + hasStarted.set(false) + } + + // TODO refactor channelManager functionality out of controller + def onTransactionManagerFailover(partitions: mutable.Set[Int], watchAndSendCallback: (TransactionRequest) => Unit) { + if (!hasStarted.get) + return + if (channelManager == null) { + val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet + channelManager = new ControllerChannelManager(controller.controllerContext, controller.config, liveBrokers) + channelManager.startup() + zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, new BrokerChangeListener()) + + if(scheduler != null) { + scheduler.schedule("kafka-recovery-point-checkpoint", + checkpointRecoveryOffsets, + delay = InitialTaskDelayMs, + period = flushCheckpointMs, + TimeUnit.MILLISECONDS) + } + } + + val offsets = ZkUtils.getTransactionRecoveryOffset(zkClient) + recoveryOffsetLock synchronized { + partitions.foreach(partition => { + recoveryOffsets.put(partition, offsets.getOrElse(partition, 0)) + unackedRequestSets.put(partition, new mutable.HashSet[TransactionRequest]()) + unackedRequestAndOffsetQueues.put(partition, new mutable.Queue[TransactionRequestAndOffset]()) + val beginOffset = recoveryOffsets(partition) + val endOffset = getHighWatermark(partition) + loadUnackedRequestsFromLog(partition, beginOffset, endOffset, watchAndSendCallback) + }) + } + } + + private def getHighWatermark(partitionId: Int): Long = { + val partitionOpt = replicaManager.getPartition(TransactionManager.TransactionTopicName, partitionId) + + val hw = partitionOpt.map { partition => + partition.leaderReplicaIfLocal().map(_.highWatermark).getOrElse(-1L) + }.getOrElse(-1L) + + hw } - private def registerBrokerChangeListener() = { - zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, new BrokerChangeListener()) + private def loadUnackedRequestsFromLog(partition: Int, beginOffset: Long, endOffset:Long, watchAndSendCallback: (TransactionRequest) => Unit) { + val topicPartition = TopicAndPartition(TransactionManager.TransactionTopicName, partition) + + loadingPartitions synchronized { + if (loadingPartitions.contains(partition)) { + info("Unacked requests load from %s already in progress.".format(topicPartition)) + } else { + loadingPartitions.add(partition) + scheduler.schedule(topicPartition.toString, loadOffsets) + } + } + + def loadOffsets() { + info("Loading offsets from " + topicPartition) + + val startMs = SystemTime.milliseconds + try { + replicaManager.logManager.getLog(topicPartition) match { + case Some(log) => + var currOffset = beginOffset + val buffer = ByteBuffer.allocate(config.transactionLoadBufferSize) + // loop breaks if leader changes at any time during the load, since getHighWatermark is -1 + while (currOffset < endOffset && getHighWatermark(partition) != -1 && hasStarted.get()) { + buffer.clear() + val messages = log.read(currOffset, config.transactionLoadBufferSize).asInstanceOf[FileMessageSet] + messages.readInto(buffer, 0) + val messageSet = new ByteBufferMessageSet(buffer) + messageSet.foreach { msgAndOffset => + //require(msgAndOffset.message.key == null, "Transaction request entry key should be null") + require(msgAndOffset.message.payload != null, "Transaction request entry value should not be null") + val txRequestInfo = TransactionManager.readMessageValue(msgAndOffset.message.payload) + val txRequest = new TransactionRequest(0, localClientId, config.transactionAckTimeoutMs, txRequestInfo) + watchAndSendCallback(txRequest) + currOffset = msgAndOffset.nextOffset + } + } + if (hasStarted.get()) + info("Finished loading offsets from %s in %d milliseconds." + .format(topicPartition, SystemTime.milliseconds - startMs)) + case None => + warn("No log found for " + topicPartition) + } + } + catch { + case t: Throwable => + error("Error in loading offsets from " + topicPartition, t) + } + finally { + loadingPartitions synchronized loadingPartitions.remove(partition) + } + } } /** @@ -83,70 +178,120 @@ class TransactionManager(val config: TransactionManagerConfig, */ class BrokerChangeListener() extends IZkChildListener with Logging { def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) { - inLock(channelLock) { - if (hasStarted.get) { - try { - val curBrokerIds = currentBrokerList.map(_.toInt).toSet - val newBrokerIds = curBrokerIds -- liveBrokerIdsUnderlying - val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)) - val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get) - val deadBrokerIds = liveBrokerIdsUnderlying -- curBrokerIds - val liveBrokersUnderlying = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) - liveBrokerIdsUnderlying = liveBrokersUnderlying.map(_.id) - - newBrokers.foreach(channelManager.addBroker(_)) - deadBrokerIds.foreach(channelManager.removeBroker(_)) - } catch { - case e: Throwable => error("Error while handling broker changes", e) - } - } + if (!hasStarted.get) + return + try { + val curBrokerIds = currentBrokerList.map(_.toInt).toSet + val newBrokerIds = curBrokerIds -- liveBrokerIdsUnderlying + val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)) + val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get) + val deadBrokerIds = liveBrokerIdsUnderlying -- curBrokerIds + val liveBrokersUnderlying = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) + liveBrokerIdsUnderlying = liveBrokersUnderlying.map(_.id) + + newBrokers.foreach(channelManager.addBroker(_)) + deadBrokerIds.foreach( broker => { + channelManager.getRemainingRequestAndCallback(broker).foreach ( item => { + val txRequest = item.asInstanceOf[(RequestOrResponse, (RequestOrResponse) => Unit)]._1.asInstanceOf[TransactionRequest] + enqueueUnsentRequests(txRequest, txRequest.requestInfo.txPartitions) + }) + channelManager.removeBroker(broker) + }) + } catch { + case e: Throwable => error("Error while handling broker changes", e) } } } - def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = { - channelManager.sendRequest(brokerId, request, callback) + def sendTxRequestToLeaders(txRequest: TransactionRequest, callback: (RequestOrResponse) => Unit, metadataCache: MetadataCache) { + if (!hasStarted.get) + return + + val partitionAndLeader = txRequest.requestInfo.txPartitions.map(topicAndPartition => { + val partitionInfoOpt = metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition) + val leaderOpt = partitionInfoOpt.map(_.leaderIsrAndControllerEpoch.leaderAndIsr.leader) + topicAndPartition -> leaderOpt + }).toMap + + val leaders = partitionAndLeader.values.filter(_.isDefined).map(_.get).toSet + val partitionsWithoutLeader = partitionAndLeader.filterNot(_._2.isDefined).keys + + leaders.foreach(leader => + channelManager.sendRequest(leader, txRequest, callback) + ) + enqueueUnsentRequests(txRequest, partitionsWithoutLeader) } - def checkpointRecoveryPointOffsets() { - ZkUtils.updateTransactionRecoveryOffset(zkClient, recoveryPoints) + private def enqueueUnsentRequests (txRequest: TransactionRequest, partitions: Iterable[TopicAndPartition]) { + unsentRequestsLock synchronized { + partitions.foreach(topicAndPartition => { + if (!unsentRequestQueues.contains(topicAndPartition)) { + unsentRequestQueues.put(topicAndPartition, new mutable.Queue[TransactionRequest]) + unsentRequestSets.put(topicAndPartition, mutable.HashSet[TransactionRequest]()) + } + unsentRequestQueues(topicAndPartition) += txRequest + unsentRequestSets(topicAndPartition) += txRequest + }) + } } - def recordPendingRequest(txRequest: TransactionRequest, nextOffset: Long) { - if (txRequest.requestInfo.txControl == TxControlTypes.PreCommit || - txRequest.requestInfo.txControl == TxControlTypes.PreAbort) { - val partition = partitionFor(txRequest.requestInfo.txGroupId) - lock synchronized { - if (!requestAndOffsetQueues.contains(partition)) { - recoveryPoints.put(partition, -1) - requestSets.put(partition, new mutable.HashSet[TransactionRequest]()) - requestAndOffsetQueues.put(partition, new mutable.Queue[TransactionRequestAndOffset]()) + def maybeResendRequests(callback: (RequestOrResponse) => Unit, metadataCache: MetadataCache) { + if (!hasStarted.get) + return + unsentRequestsLock synchronized { + val topicAndPartitions = unsentRequestQueues.keys + topicAndPartitions.foreach(topicAndPartition => { + val txRequests = unsentRequestQueues.getOrElse(topicAndPartition, null) + if (txRequests != null) { + val partitionInfoOpt = metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition) + val leaderOpt = partitionInfoOpt.map(_.leaderIsrAndControllerEpoch.leaderAndIsr.leader) + if (leaderOpt.isDefined) { + txRequests.foreach(txRequest => + channelManager.sendRequest(leaderOpt.get, txRequest, callback) + ) + unsentRequestQueues.remove(topicAndPartition) + unsentRequestSets.remove(topicAndPartition) + } } - requestSets(partition) += txRequest - requestAndOffsetQueues(partition) += TransactionRequestAndOffset(txRequest, nextOffset) - } + }) } } - def checkpointTransactionHW(txRequest: TransactionRequest) { - val origTxRequest = txRequest.requestInfo.txControl match { - case TxControlTypes.Commit => TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.PreCommit) - case TxControlTypes.Abort => TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.PreAbort) + private def checkpointRecoveryOffsets() { + recoveryOffsetLock synchronized { + val localRecoveryOffsets = recoveryOffsets.filterKeys(partition => + replicaManager.isLeaderReplicaLocal(TransactionManager.TransactionTopicName, partition) + ) + if (localRecoveryOffsets.size > 0) + ZkUtils.updateTransactionRecoveryOffset(zkClient, localRecoveryOffsets) } + } + + def updateUnackedRequest(txRequest: TransactionRequest, nextOffset: Long) { + if (txRequest.requestInfo.txControl == TxRequestTypes.PreCommit || + txRequest.requestInfo.txControl == TxRequestTypes.PreAbort) { + val partition = partitionFor(txRequest.requestInfo.txGroupId) + recoveryOffsetLock synchronized { + unackedRequestSets(partition) += txRequest + unackedRequestAndOffsetQueues(partition) += TransactionRequestAndOffset(txRequest, nextOffset) + } + } + } + def updateAckedRequest(txRequest: TransactionRequest) { val partition = partitionFor(txRequest.requestInfo.txGroupId) - lock synchronized { - requestSets(partition) -= origTxRequest - requestAndOffsetQueues(partition).dropWhile{ case TransactionRequestAndOffset(txRequest, nextOffset) => { - val shouldDrop = !requestSets(partition).contains(txRequest) - if (shouldDrop) recoveryPoints(partition) = nextOffset + recoveryOffsetLock synchronized { + unackedRequestSets(partition) -= txRequest + unackedRequestAndOffsetQueues(partition).dropWhile{ case TransactionRequestAndOffset(txRequest, nextOffset) => { + val shouldDrop = !unackedRequestSets(partition).contains(txRequest) + if (shouldDrop) recoveryOffsets(partition) = nextOffset shouldDrop }} } } - def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % config.transactionsTopicNumPartitions + def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % config.transactionTopicNumPartitions def getNextTransactionId(): Int = { synchronized { @@ -193,8 +338,7 @@ object TransactionManager { } } - - def transactionControlValue(txRequestInfo: TransactionRequestInfo): Array[Byte] = { + def transactionRequestValue(txRequestInfo: TransactionRequestInfo): Array[Byte] = { val value = new Struct(CURRENT_SCHEMA.valueSchema) value.set("groupid", txRequestInfo.txGroupId) value.set("txid", txRequestInfo.txId) @@ -214,6 +358,27 @@ object TransactionManager { byteBuffer.putShort(CURRENT_TRANSACTION_SCHEMA_VERSION) value.writeTo(byteBuffer) byteBuffer.array() - ("(value:" + txRequestInfo.toString + ")").getBytes } + + def readMessageValue(buffer: ByteBuffer): TransactionRequestInfo = { + val version = buffer.getShort() + val valueSchema = schemaFor(version).valueSchema + val value = valueSchema.read(buffer).asInstanceOf[Struct] + + val txGroupId = value.get("groupid").asInstanceOf[String] + val txId = value.get("txid").asInstanceOf[Int] + val txControl = value.get("txcontrol").asInstanceOf[Short] + val txTimeoutMs = value.get("txtimeout").asInstanceOf[Int] + + val txPartitions = value.getArray("txpartitions").map { + subValue => { + val topicAndPartition = subValue.asInstanceOf[Struct] + val topic = topicAndPartition.get("topic").asInstanceOf[String] + val partition = topicAndPartition.get("partition").asInstanceOf[Int] + TopicAndPartition(topic, partition) + } + } + TransactionRequestInfo(txGroupId, txId, txControl, txTimeoutMs, txPartitions) + } + } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index f7e4073..2d1fd5f 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -622,7 +622,7 @@ object ZkUtils extends Logging { "replicas" -> e._2)))) } - def getTransactionRecoveryOffset(zkClient: ZkClient): immutable.Map[Int, Long] = { + def getTransactionRecoveryOffset(zkClient: ZkClient): Map[Int, Long] = { val partitions = try { getChildren(zkClient, transactionRecoveryPath) } catch { @@ -641,7 +641,7 @@ object ZkUtils extends Logging { recoveryPoints } - def updateTransactionRecoveryOffset(zkClient: ZkClient, recoveryPoints: mutable.Map[Int, Long]) { + def updateTransactionRecoveryOffset(zkClient: ZkClient, recoveryPoints: Map[Int, Long]) { recoveryPoints.foreach{ case (partition: Int, recoveryPoint: Long) => { val zkPath = getTransactionPartitionRecoveryPath(partition) val data = recoveryPoint.toString -- 1.7.12.4