diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 8d5c2e7..624e37f 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConversions._ import kafka.cluster.Broker import kafka.log.LogConfig import kafka.consumer.Whitelist -import kafka.server.OffsetManager +import kafka.server.{TransactionManager, OffsetManager} object TopicCommand { @@ -106,8 +106,8 @@ object TopicCommand { println("Updated config for topic \"%s\".".format(topic)) } if(opts.options.has(opts.partitionsOpt)) { - if (topic == OffsetManager.OffsetsTopicName) { - throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.") + if (topic == OffsetManager.OffsetsTopicName || topic == TransactionManager.TransactionTopicName) { + throw new IllegalArgumentException("The number of partitions for the " + topic + " topic cannot be changed.") } println("WARNING: If partitions are increased for a topic that has a key, the partition " + "logic or ordering of the messages will be affected") diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala index ad75978..91d9052 100644 --- a/core/src/main/scala/kafka/common/Topic.scala +++ b/core/src/main/scala/kafka/common/Topic.scala @@ -18,7 +18,7 @@ package kafka.common import util.matching.Regex -import kafka.server.OffsetManager +import kafka.server.{TransactionManager, OffsetManager} object Topic { @@ -26,7 +26,7 @@ object Topic { private val maxNameLength = 255 private val rgx = new Regex(legalChars + "+") - val InternalTopics = Set(OffsetManager.OffsetsTopicName) + val InternalTopics = Set(OffsetManager.OffsetsTopicName, TransactionManager.TransactionTopicName) def validate(topic: String) { if (topic.length <= 0) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 8763968..f578961 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -152,6 +152,8 @@ class RequestSendThread(val controllerId: Int, response = StopReplicaResponse.readFrom(receive.buffer) case RequestKeys.UpdateMetadataKey => response = UpdateMetadataResponse.readFrom(receive.buffer) + case RequestKeys.TransactionKey => + response = TransactionResponse.readFrom(receive.buffer) } stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %s" .format(controllerId, controllerContext.epoch, response.correlationId, toBroker.toString())) diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index d2a7293..feb92ba 100644 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -33,9 +33,11 @@ object Message { val CrcLength = 4 val MagicOffset = CrcOffset + CrcLength val MagicLength = 1 + val TxIdLength = 4 val AttributesOffset = MagicOffset + MagicLength val AttributesLength = 1 - val KeySizeOffset = AttributesOffset + AttributesLength + val TxIdOffset = AttributesOffset + AttributesLength + val KeySizeOffset = TxIdOffset + TxIdLength val KeySizeLength = 4 val KeyOffset = KeySizeOffset + KeySizeLength val ValueSizeLength = 4 @@ -46,7 +48,7 @@ object Message { /** * The minimum valid size for the message header */ - val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength + val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength + TxIdLength /** * The current "magic" value @@ -60,6 +62,16 @@ object Message { val CompressionCodeMask: Int = 0x07 /** + * Specifies the mask for the transaction control. 2 bits to hold the compression codec. + * 0 is reserved to indicate no compression + */ + val TransactionControlMask: Int = 0x07 << 3 + + /** + * Specifies the offset for the transaction control within attributes. + */ + val TransactionControlOffset: Int = 3 + /** * Compression code for uncompressed messages */ val NoCompression: Int = 0 @@ -76,6 +88,7 @@ object Message { * 5. K byte key * 6. 4 byte payload length, containing length V * 7. V byte payload + * 8. 4 byte txId * * Default constructor wraps an existing ByteBuffer with the Message object with no change to the contents. */ @@ -91,27 +104,33 @@ class Message(val buffer: ByteBuffer) { * @param payloadOffset The offset into the payload array used to extract payload * @param payloadSize The size of the payload to use */ - def this(bytes: Array[Byte], - key: Array[Byte], - codec: CompressionCodec, - payloadOffset: Int, - payloadSize: Int) = { - this(ByteBuffer.allocate(Message.CrcLength + - Message.MagicLength + - Message.AttributesLength + - Message.KeySizeLength + - (if(key == null) 0 else key.length) + - Message.ValueSizeLength + - (if(bytes == null) 0 - else if(payloadSize >= 0) payloadSize - else bytes.length - payloadOffset))) + def this(bytes: Array[Byte], + key: Array[Byte], + codec: CompressionCodec, + payloadOffset: Int, + payloadSize: Int, + txId: Int, + txControl: Short) = { + this(ByteBuffer.allocate(Message.CrcLength + + Message.MagicLength + + Message.AttributesLength + + Message.KeySizeLength + + (if(key == null) 0 else key.length) + + Message.ValueSizeLength + + (if(bytes == null) 0 + else if(payloadSize >= 0) payloadSize + else bytes.length - payloadOffset) + + Message.TxIdLength)) // skip crc, we will fill that in at the end buffer.position(MagicOffset) buffer.put(CurrentMagicValue) var attributes: Byte = 0 - if (codec.codec > 0) - attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte + if (codec.codec > 0 || txControl > 0) + attributes = (attributes | (CompressionCodeMask & codec.codec) + | (TransactionControlMask & (txControl << TransactionControlOffset))).toByte + buffer.put(attributes) + buffer.putInt(txId) if(key == null) { buffer.putInt(-1) } else { @@ -130,22 +149,25 @@ class Message(val buffer: ByteBuffer) { Utils.writeUnsignedInt(buffer, CrcOffset, computeChecksum) } - def this(bytes: Array[Byte], key: Array[Byte], codec: CompressionCodec) = - this(bytes = bytes, key = key, codec = codec, payloadOffset = 0, payloadSize = -1) + def this(bytes: Array[Byte], key: Array[Byte], codec: CompressionCodec) = + this(bytes = bytes, key = key, codec = codec, payloadOffset = 0, payloadSize = -1, txId = -1, txControl = 0) def this(bytes: Array[Byte], codec: CompressionCodec) = this(bytes = bytes, key = null, codec = codec) - def this(bytes: Array[Byte], key: Array[Byte]) = + def this(bytes: Array[Byte], key: Array[Byte]) = this(bytes = bytes, key = key, codec = NoCompressionCodec) - + def this(bytes: Array[Byte]) = this(bytes = bytes, key = null, codec = NoCompressionCodec) - + + def this(bytes: Array[Byte], key: Array[Byte], txId: Int, txControl: Short) = + this(bytes = bytes, key = key, codec = NoCompressionCodec, payloadOffset = 0, payloadSize = -1, txId = txId, txControl = txControl) + /** * Compute the checksum of the message from the message contents */ - def computeChecksum(): Long = + def computeChecksum(): Long = Utils.crc32(buffer.array, buffer.arrayOffset + MagicOffset, buffer.limit - MagicOffset) /** @@ -211,7 +233,12 @@ class Message(val buffer: ByteBuffer) { */ def compressionCodec: CompressionCodec = CompressionCodec.getCompressionCodec(buffer.get(AttributesOffset) & CompressionCodeMask) - + + /** + * The transaction control used with this message + */ + def txControl: Short = ((buffer.get(AttributesOffset) & TransactionControlMask) >> TransactionControlOffset).toShort + /** * A ByteBuffer containing the content of the message */ @@ -223,6 +250,11 @@ class Message(val buffer: ByteBuffer) { def key: ByteBuffer = sliceDelimited(KeySizeOffset) /** + * A ByteBuffer containing the txId of the message + */ + def txId: Int = buffer.getInt(TxIdOffset) + + /** * Read a size-delimited byte buffer starting at the given offset */ private def sliceDelimited(start: Int): ByteBuffer = { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0b668f2..c0f5177 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -18,6 +18,7 @@ package kafka.server import kafka.api._ +import kafka.cluster.Broker import kafka.common._ import kafka.log._ import kafka.message._ @@ -27,24 +28,26 @@ import kafka.metrics.KafkaMetricsGroup import kafka.network.RequestChannel.Response import kafka.controller.KafkaController import kafka.utils.{Pool, SystemTime, Logging} - -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic._ import scala.collection._ - import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.common.KafkaException /** + * * Logic to handle the various Kafka requests */ class KafkaApis(val requestChannel: RequestChannel, val replicaManager: ReplicaManager, val offsetManager: OffsetManager, + val transactionManager: TransactionManager, val zkClient: ZkClient, val brokerId: Int, val config: KafkaConfig, val controller: KafkaController) extends Logging { + private val commitRequestPurgatory = new CommitRequestPurgatory private val producerRequestPurgatory = new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests) private val fetchRequestPurgatory = @@ -71,6 +74,8 @@ class KafkaApis(val requestChannel: RequestChannel, case RequestKeys.OffsetCommitKey => handleProducerOrOffsetCommitRequest(request) case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request) + case RequestKeys.TransactionKey => handleTransactionRequest(request) + case RequestKeys.TxCoordinatorMetadataKey => handleTxCoordinatorMetadataRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { @@ -143,6 +148,58 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def producerRequestFromTxRequestToBroker(txRequest: TransactionRequest): ProducerRequest = { + val byteBufferMessageSet =new ByteBufferMessageSet( + config.transactionsTopicCompressionCodec, + new Message( + bytes = TransactionManager.transactionControlValue(txRequest.requestInfo), + key = Array.empty[Byte], + txId = txRequest.requestInfo.txId, + txControl = txRequest.requestInfo.txControl)) + + val localPartitions = txRequest.requestInfo.txPartitions.filter(topicAndPartition => { + val partitionInfoOpt = metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition) + val localBrokerId = replicaManager.config.brokerId + partitionInfoOpt match { + case Some(partitionInfo) => partitionInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == localBrokerId + case None => false + }}) + + val producerData = mutable.Map(localPartitions.map(topicAndPartition => topicAndPartition -> byteBufferMessageSet): _*) + + val request = ProducerRequest( + correlationId = txRequest.correlationId, + clientId = txRequest.clientId, + requiredAcks = -1, + ackTimeoutMs = txRequest.ackTimeoutMs, + data = producerData) + trace("Created producer request %s for transaction request %s.".format(request, txRequest)) + request + } + + private def producerRequestFromTxRequestToCoordinator(txRequest: TransactionRequest) = { + val message = new Message( + bytes = TransactionManager.transactionControlValue(txRequest.requestInfo), + key = Array.empty[Byte], + txId = txRequest.requestInfo.txId, + txControl = txRequest.requestInfo.txControl) + + val partition = transactionManager.partitionFor(txRequest.requestInfo.txGroupId) + + val producerData = mutable.Map( + TopicAndPartition(TransactionManager.TransactionTopicName, partition) -> + new ByteBufferMessageSet(config.transactionsTopicCompressionCodec, message)) + + val request = ProducerRequest( + correlationId = txRequest.correlationId, + clientId = txRequest.clientId, + requiredAcks = -1, + ackTimeoutMs = txRequest.ackTimeoutMs, + data = producerData) + trace("Created producer request %s for transaction request %s.".format(request, txRequest)) + request + } + private def producerRequestFromOffsetCommit(offsetCommitRequest: OffsetCommitRequest) = { val msgs = offsetCommitRequest.filterLargeMetadata(config.offsetMetadataMaxSize).map { case (topicAndPartition, offset) => @@ -167,6 +224,83 @@ class KafkaApis(val requestChannel: RequestChannel, request } + def sendTxRequestToBrokers(txRequest: TransactionRequest) { + val partitionInfoOpts = txRequest.requestInfo.txPartitions.map ( topicAndPartition => + metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition)) + val brokers = partitionInfoOpts.filter(_ != None).map(_.get.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet + brokers.foreach(broker => + transactionManager.sendRequest(broker, txRequest, (response: RequestOrResponse) => commitRequestPurgatory.update(response)) + ) + } + +/** + * 1. on receiving CoordinatorBegin message + * - append to transactionTopic + * - wait for ack from followers, and respond + * + * 2. on receiving PreCommit message + * - append to transactionTopic + * - wait for ack from followers, and respond + * - send Commit message to brokers + * - wait for ack from brokers, and append Committed to transactionTopic + * + * 3. on receiving Commit message + * - append to topicPartition + * - wait for ack from followers, and respond + * + * 4. on receiving Abort message + * - append to transactionTopic + * - wait for ack from followers, and resp + */ + + def handleTransactionRequest(request: RequestChannel.Request) { + var txRequest = request.requestObj.asInstanceOf[TransactionRequest] + val txControl = txRequest.requestInfo.txControl + if (txControl == TxControlTypes.Begin) { + val newTxId = transactionManager.getNextTransactionId + txRequest = txRequest.copy(requestInfo = txRequest.requestInfo.copy(txId = newTxId)) + } + val producerRequest = txControl match { + case TxControlTypes.Begin | TxControlTypes.PreCommit | TxControlTypes.PreAbort => + producerRequestFromTxRequestToCoordinator(txRequest) + case TxControlTypes.Commit | TxControlTypes.Abort => + producerRequestFromTxRequestToBroker(txRequest) + case _ => throw new KafkaException("Unhandled Transaction Control Key %i".format(txControl)) + } + val localProduceResults = appendToLocalLog(producerRequest) + + // create a list of (topic, partition) pairs to use as keys for this delayed request + val producerRequestKeys = producerRequest.data.keys.map( + topicAndPartition => new RequestKey(topicAndPartition)).toSeq + val statuses = localProduceResults.map(r => + r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap + val delayedRequest = new DelayedProduce( + producerRequestKeys, + request, + statuses, + producerRequest, + producerRequest.ackTimeoutMs.toLong, + None, + Some(txRequest)) + + producerRequestPurgatory.watch(delayedRequest) + + /* + * Replica fetch requests may have arrived (and potentially satisfied) + * delayedProduce requests while they were being added to the purgatory. + * Here, we explicitly check if any of them can be satisfied. + */ + var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] + producerRequestKeys.foreach(key => + satisfiedProduceRequests ++= + producerRequestPurgatory.update(key, key)) + debug(satisfiedProduceRequests.size + + " producer requests unblocked during produce to local log.") + satisfiedProduceRequests.foreach(_.respond()) + // we do not need the data anymore + producerRequest.emptyData() +} + /** * Handle a produce request or offset commit request (which is really a specialized producer request) */ @@ -238,7 +372,8 @@ class KafkaApis(val requestChannel: RequestChannel, statuses, produceRequest, produceRequest.ackTimeoutMs.toLong, - offsetCommitRequestOpt) + offsetCommitRequestOpt, + None) producerRequestPurgatory.watch(delayedRequest) @@ -559,7 +694,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (topics.size > 0 && topicResponses.size != topics.size) { val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet val responsesForNonExistentTopics = nonExistentTopics.map { topic => - if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) { + if (topic == OffsetManager.OffsetsTopicName || topic == TransactionManager.TransactionTopicName || config.autoCreateTopicsEnable) { try { if (topic == OffsetManager.OffsetsTopicName) { AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, @@ -567,6 +702,11 @@ class KafkaApis(val requestChannel: RequestChannel, info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" .format(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor)) } + else if (topic == TransactionManager.TransactionTopicName) { + AdminUtils.createTopic(zkClient, topic, config.transactionsTopicPartitions, config.transactionsTopicReplicationFactor) + info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" + .format(topic, config.transactionsTopicPartitions, config.transactionsTopicReplicationFactor)) + } else { AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" @@ -611,6 +751,28 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } + + def handleTxCoordinatorMetadataRequest(request: RequestChannel.Request) { + val txCoordinatorMetadataRequest = request.requestObj.asInstanceOf[TxCoordinatorMetadataRequest] + val partition = transactionManager.partitionFor(txCoordinatorMetadataRequest.txGroupId) + // get metadata (and create the topic if necessary) + val transactionTopicMetadata = getTopicMetadata(Set(TransactionManager.TransactionTopicName)).head + + val errorResponse = TxCoordinatorMetadataResponse(None, ErrorMapping.TxCoordinatorNotAvailableCode, txCoordinatorMetadataRequest.correlationId) + + val response = + transactionTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata => + partitionMetadata.leader.map { leader => + TxCoordinatorMetadataResponse(Some(leader), ErrorMapping.NoError, txCoordinatorMetadataRequest.correlationId) + }.getOrElse(errorResponse) + }.getOrElse(errorResponse) + + trace("Sending transaction metadata %s for correlation id %d to client %s." + .format(response, txCoordinatorMetadataRequest.correlationId, txCoordinatorMetadataRequest.clientId)) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } + + /* * Service the consumer metadata API */ @@ -704,12 +866,63 @@ class KafkaApis(val requestChannel: RequestChannel, } } + class DelayedCommit(val txRequest: TransactionRequest) { + val satisfied = new AtomicBoolean(false) + val partitions = new mutable.HashSet[TopicAndPartition] + partitions ++= txRequest.requestInfo.txPartitions + + def checkSatisfied(ackedPartitions: Seq[TopicAndPartition]) = { + this synchronized { + partitions --= ackedPartitions + partitions.isEmpty + } + } + + def respond() { + val txRequestToLog = txRequest.requestInfo.txControl match { + case TxControlTypes.Commit => + TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.Committed) + case TxControlTypes.Abort => + TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.Aborted) + } + + val producerRequest = producerRequestFromTxRequestToCoordinator(txRequestToLog) + appendToLocalLog(producerRequest) + transactionManager.checkpointTransactionHW(txRequest) + } + } + + class CommitRequestPurgatory() { + private val commitRequestMap = new ConcurrentHashMap[Int, DelayedCommit]() + + def watch(txId: Int, delayed: DelayedCommit) { + assert(!commitRequestMap.contains(txId)) + commitRequestMap.put(txId, delayed) + } + + def update(response: RequestOrResponse) { + val txResponse = response.asInstanceOf[TransactionResponse] + val partitions = txResponse.status.filter(_._2 == ErrorMapping.NoError).keys.toSeq + val delayedCommit = commitRequestMap.get(txResponse.txId) + + if (delayedCommit != null) { + if (delayedCommit.checkSatisfied(partitions)) { + if (delayedCommit.satisfied.compareAndSet(false, true)) + delayedCommit.respond() + } + } else { + info("txId %i not found in commitRequestMap".format(txResponse.txId)) + } + } + } + class DelayedProduce(keys: Seq[RequestKey], request: RequestChannel.Request, val partitionStatus: immutable.Map[TopicAndPartition, DelayedProduceResponseStatus], produce: ProducerRequest, delayMs: Long, - offsetCommitRequestOpt: Option[OffsetCommitRequest] = None) + offsetCommitRequestOpt: Option[OffsetCommitRequest] = None, + transactionRequestOpt : Option[TransactionRequest] = None) extends DelayedRequest(keys, request, delayMs) with Logging { // first update the acks pending variable according to the error code @@ -725,6 +938,12 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus)) } + def watchAndSendRequests(txRequest: TransactionRequest) { + val delayedCommit = new DelayedCommit(txRequest) + commitRequestPurgatory.watch(txRequest.requestInfo.txId, delayedCommit) + sendTxRequestToBrokers(txRequest) + } + def respond() { val responseStatus = partitionStatus.map { case (topicAndPartition, delayedStatus) => topicAndPartition -> delayedStatus.status @@ -735,14 +954,32 @@ class KafkaApis(val requestChannel: RequestChannel, }.map(_._2.error).getOrElse(ErrorMapping.NoError) if (errorCode == ErrorMapping.NoError) { - offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) + offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo)) + transactionRequestOpt.foreach(txRequest => + partitionStatus.foreach{ case (topicAndPartition, delayedStatus) => + transactionManager.recordPendingRequest(txRequest, delayedStatus.requiredOffset)}) } - val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, config.offsetMetadataMaxSize)) - .getOrElse(ProducerResponse(produce.correlationId, responseStatus)) + val response: RequestOrResponse = + if (offsetCommitRequestOpt != None) offsetCommitRequestOpt.get.responseFor(errorCode, config.offsetMetadataMaxSize) + else if (transactionRequestOpt != None) transactionRequestOpt.get.responseFor(responseStatus.mapValues(_.error)) + else ProducerResponse(produce.correlationId, responseStatus) requestChannel.sendResponse(new RequestChannel.Response( request, new BoundedByteBufferSend(response))) + + // For preCommit/preAbort, forward commit/abort to partitions involved in the transaction + if (errorCode == ErrorMapping.NoError) { + transactionRequestOpt.foreach(txRequest => txRequest.requestInfo.txControl match { + case TxControlTypes.PreCommit => + val newTxRequest = TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.Commit) + watchAndSendRequests(newTxRequest) + case TxControlTypes.PreAbort => + val newTxRequest = TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.Abort) + watchAndSendRequests(newTxRequest) + case _ => + } + )} } /** diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ef75b67..1e32707 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -291,4 +291,20 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off */ val deleteTopicEnable = props.getBoolean("delete.topic.enable", false) + /*********** Transaction management configuration ***********/ + + /** The number of partitions for the offset commit topic (should not change after deployment). */ + val transactionsTopicPartitions: Int = props.getIntInRange("transaction.topic.num.partitions", + TransactionManagerConfig.DefaultTransactionsTopicNumPartitions, (1, Integer.MAX_VALUE)) + + /** The replication factor for the transaction topic (set higher to ensure availability). */ + val transactionsTopicReplicationFactor: Short = props.getShortInRange("transaction.topic.replication.factor", + TransactionManagerConfig.DefaultTransactionsTopicReplicationFactor, (1, Short.MaxValue)) + + /** Compression codec for the transaction topic. */ + val transactionsTopicCompressionCodec = props.getCompressionCodec("transaction.topic.compression.codec", + TransactionManagerConfig.DefaultTransactionsTopicCompressionCodec) + + /* the frequency with which we update the persistent record of the last pending txControl which acts as the transaction recovery point */ + val transactionFlushOffsetCheckpointIntervalMs = props.getIntInRange("transaction.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue)) } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c22e51e..3c5022d 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -49,6 +49,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var requestHandlerPool: KafkaRequestHandlerPool = null var logManager: LogManager = null var offsetManager: OffsetManager = null + var transactionManager: TransactionManager = null var kafkaHealthcheck: KafkaHealthcheck = null var topicConfigManager: TopicConfigManager = null var replicaManager: ReplicaManager = null @@ -100,9 +101,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg offsetManager = createOffsetManager() kafkaController = new KafkaController(config, zkClient, brokerState) + + /* start transaction manager */ + transactionManager = createTransactionManager() + + transactionManager.startup() /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) + apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, transactionManager, + zkClient, config.brokerId, config, kafkaController) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) @@ -256,6 +263,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Utils.swallow(logManager.shutdown()) if(kafkaController != null) Utils.swallow(kafkaController.shutdown()) + if(transactionManager != null) + Utils.swallow(transactionManager.shutdown()) if(zkClient != null) Utils.swallow(zkClient.close()) @@ -310,6 +319,16 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg time = time) } + private def createTransactionManager(): TransactionManager = { + val transactionConfig = TransactionManagerConfig( + transactionsTopicNumPartitions = config.transactionsTopicPartitions) + new TransactionManager(config = transactionConfig, + controller = kafkaController, + scheduler = kafkaScheduler, + flushCheckpointMs = config.transactionFlushOffsetCheckpointIntervalMs, + zkClient = zkClient) + } + private def createOffsetManager(): OffsetManager = { val offsetManagerConfig = OffsetManagerConfig( maxMetadataSize = config.offsetMetadataMaxSize, diff --git a/core/src/main/scala/kafka/server/TransactionManager.scala b/core/src/main/scala/kafka/server/TransactionManager.scala new file mode 100644 index 0000000..12d7c74 --- /dev/null +++ b/core/src/main/scala/kafka/server/TransactionManager.scala @@ -0,0 +1,219 @@ +package kafka.server + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.ReentrantLock +import collection.JavaConversions._ +import kafka.utils.Utils._ +import kafka.api._ +import kafka.controller.{ControllerChannelManager, KafkaController} +import org.I0Itec.zkclient.{IZkChildListener, ZkClient} +import org.apache.kafka.common.protocol.types._ +import org.apache.kafka.common.protocol.types.Type._ +import kafka.common.KafkaException +import java.nio.ByteBuffer +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import kafka.utils.{Logging, Scheduler, ZkUtils, Utils} +import kafka.message.NoCompressionCodec + +case class TransactionManagerConfig (transactionsTopicNumPartitions: Int) + +object TransactionManagerConfig { + val DefaultTransactionsTopicNumPartitions = 1 + val DefaultTransactionsTopicCompressionCodec = NoCompressionCodec + val DefaultTransactionsIdBatchSize = 1000 + val DefaultTransactionsTopicReplicationFactor = 1.toShort +} + +case class TransactionRequestAndOffset(txRequest: TransactionRequest, nextOffset: Long) {} + +class TransactionManager(val config: TransactionManagerConfig, + val controller: KafkaController, + scheduler: Scheduler, + val flushCheckpointMs: Int, + val zkClient: ZkClient = null) { + + var nextTxId = -1 + var txIdBatchEnd = -1 + val InitialTaskDelayMs = 30*1000 + + private var channelManager: ControllerChannelManager = null + private val hasStarted = new AtomicBoolean(false) + private val channelLock: ReentrantLock = new ReentrantLock() + private var liveBrokerIdsUnderlying: Set[Int] = Set.empty + + + private val lock = new Object + private val recoveryPoints = mutable.Map[Int, Long]() + // The set of txRequests which have not been acknowledged by brokers involved in the transaction + private val requestSets = new mutable.HashMap[Int, mutable.HashSet[TransactionRequest]] + // This queue maintains the earliest not-yet-acknowledged txRequest and its next offset on the log + private val requestAndOffsetQueues = new mutable.HashMap[Int, mutable.Queue[TransactionRequestAndOffset]] + + def startup() { + /* Checkpoint high watermark for transaction topicPartition in zookeeper*/ + if(scheduler != null) { + scheduler.schedule("kafka-recovery-point-checkpoint", + checkpointRecoveryPointOffsets, + delay = InitialTaskDelayMs, + period = flushCheckpointMs, + TimeUnit.MILLISECONDS) + } + channelManager = new ControllerChannelManager(controller.controllerContext, controller.config) + channelManager.startup() + registerBrokerChangeListener() + hasStarted.set(true) + } + + def shutdown() { + hasStarted.set(false) + if(channelManager != null) { + channelManager.shutdown() + channelManager = null + } + } + + private def registerBrokerChangeListener() = { + zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, new BrokerChangeListener()) + } + + /** + * This is the zookeeper listener that triggers all the state transitions for a replica + */ + class BrokerChangeListener() extends IZkChildListener with Logging { + def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) { + inLock(channelLock) { + if (hasStarted.get) { + try { + val curBrokerIds = currentBrokerList.map(_.toInt).toSet + val newBrokerIds = curBrokerIds -- liveBrokerIdsUnderlying + val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)) + val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get) + val deadBrokerIds = liveBrokerIdsUnderlying -- curBrokerIds + val liveBrokersUnderlying = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) + liveBrokerIdsUnderlying = liveBrokersUnderlying.map(_.id) + + newBrokers.foreach(channelManager.addBroker(_)) + deadBrokerIds.foreach(channelManager.removeBroker(_)) + } catch { + case e: Throwable => error("Error while handling broker changes", e) + } + } + } + } + } + + def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = { + channelManager.sendRequest(brokerId, request, callback) + } + + def checkpointRecoveryPointOffsets() { + ZkUtils.updateTransactionRecoveryOffset(zkClient, recoveryPoints) + } + + def recordPendingRequest(txRequest: TransactionRequest, nextOffset: Long) { + if (txRequest.requestInfo.txControl == TxControlTypes.PreCommit || + txRequest.requestInfo.txControl == TxControlTypes.PreAbort) { + val partition = partitionFor(txRequest.requestInfo.txGroupId) + lock synchronized { + if (!requestAndOffsetQueues.contains(partition)) { + recoveryPoints.put(partition, -1) + requestSets.put(partition, new mutable.HashSet[TransactionRequest]()) + requestAndOffsetQueues.put(partition, new mutable.Queue[TransactionRequestAndOffset]()) + } + requestSets(partition) += txRequest + requestAndOffsetQueues(partition) += TransactionRequestAndOffset(txRequest, nextOffset) + } + } + } + + def checkpointTransactionHW(txRequest: TransactionRequest) { + val origTxRequest = txRequest.requestInfo.txControl match { + case TxControlTypes.Commit => TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.PreCommit) + case TxControlTypes.Abort => TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.PreAbort) + } + + val partition = partitionFor(txRequest.requestInfo.txGroupId) + lock synchronized { + requestSets(partition) -= origTxRequest + + requestAndOffsetQueues(partition).dropWhile{ case TransactionRequestAndOffset(txRequest, nextOffset) => { + val shouldDrop = !requestSets(partition).contains(txRequest) + if (shouldDrop) recoveryPoints(partition) = nextOffset + shouldDrop + }} + } + } + + def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % config.transactionsTopicNumPartitions + + def getNextTransactionId(): Int = { + synchronized { + if (nextTxId == txIdBatchEnd) { + val newTxIdBatch = ZkUtils.getNewTransactionIdBatch(zkClient) + nextTxId = newTxIdBatch * TransactionManagerConfig.DefaultTransactionsIdBatchSize + txIdBatchEnd = (newTxIdBatch + 1) * TransactionManagerConfig.DefaultTransactionsIdBatchSize + } + nextTxId += 1 + nextTxId - 1 + } + } +} + +object TransactionManager { + + val TransactionTopicName = "__transaction_control" + + private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema) + + private val CURRENT_TRANSACTION_SCHEMA_VERSION = 0.toShort + + private val TRANSACTION_KEY_SCHEMA_V0 = new Schema() + + private val TRANSACTION_VALUE_SCHEMA_V0 = new Schema(new Field("groupid", STRING), + new Field("txid", INT32), + new Field("txcontrol", INT16), + new Field("txtimeout", INT32), + new Field("txpartitions", + new ArrayOf(new Schema( + new Field("topic", STRING), + new Field("partition", INT32))))) + + // map of versions to schemas + private val TRANSACTION_SCHEMAS = Map(0 -> KeyAndValueSchemas(TRANSACTION_KEY_SCHEMA_V0, TRANSACTION_VALUE_SCHEMA_V0)) + + private val CURRENT_SCHEMA = schemaFor(CURRENT_TRANSACTION_SCHEMA_VERSION) + + private def schemaFor(version: Int) = { + val schemaOpt = TRANSACTION_SCHEMAS.get(version) + schemaOpt match { + case Some(schema) => schema + case _ => throw new KafkaException("Unknown transaction schema version " + version) + } + } + + + def transactionControlValue(txRequestInfo: TransactionRequestInfo): Array[Byte] = { + val value = new Struct(CURRENT_SCHEMA.valueSchema) + value.set("groupid", txRequestInfo.txGroupId) + value.set("txid", txRequestInfo.txId) + value.set("txcontrol", txRequestInfo.txControl) + value.set("txtimeout", txRequestInfo.txTimeoutMs) + + val topicAndPartitions = ArrayBuffer[Struct]() + for (tp <- txRequestInfo.txPartitions) { + val topicAndPartition = value.instance("txpartitions"); + topicAndPartition.set("topic", tp.topic) + topicAndPartition.set("partition", tp.partition) + topicAndPartitions += topicAndPartition + } + value.set("txpartitions", topicAndPartitions.toArray) + + val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) + byteBuffer.putShort(CURRENT_TRANSACTION_SCHEMA_VERSION) + value.writeTo(byteBuffer) + byteBuffer.array() + ("(value:" + txRequestInfo.toString + ")").getBytes + } +} diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index dcdc1ce..f7e4073 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -46,6 +46,8 @@ object ZkUtils extends Logging { val ReassignPartitionsPath = "/admin/reassign_partitions" val DeleteTopicsPath = "/admin/delete_topics" val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" + val transactionIdPath = "/transaction/ids" + val transactionRecoveryPath = "/transaction/recovery_offsets" def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic @@ -61,6 +63,9 @@ object ZkUtils extends Logging { def getDeleteTopicPath(topic: String): String = DeleteTopicsPath + "/" + topic + def getTransactionPartitionRecoveryPath(partition: Int): String = + transactionRecoveryPath + "/" + partition + def getController(zkClient: ZkClient): Int = { readDataMaybeNull(zkClient, ControllerPath)._1 match { case Some(controller) => KafkaController.parseControllerId(controller) @@ -617,6 +622,41 @@ object ZkUtils extends Logging { "replicas" -> e._2)))) } + def getTransactionRecoveryOffset(zkClient: ZkClient): immutable.Map[Int, Long] = { + val partitions = try { + getChildren(zkClient, transactionRecoveryPath) + } catch { + case nne: ZkNoNodeException => + zkClient.createPersistent(transactionRecoveryPath, true) + debug("Created path %s for transaction partition recovery".format(transactionRecoveryPath)) + Seq.empty[String] + case e2: Throwable => throw new AdminOperationException(e2.toString) + } + + val recoveryPoints = partitions.map { partition => { + val zkPath = getTransactionPartitionRecoveryPath(partition.toInt) + val data = readData(zkClient, zkPath)._1 + (partition.toInt, data.toLong) + }}.toMap + recoveryPoints + } + + def updateTransactionRecoveryOffset(zkClient: ZkClient, recoveryPoints: mutable.Map[Int, Long]) { + recoveryPoints.foreach{ case (partition: Int, recoveryPoint: Long) => { + val zkPath = getTransactionPartitionRecoveryPath(partition) + val data = recoveryPoint.toString + try { + updatePersistentPath(zkClient, zkPath, data) + info("Updated transaction partition %s with recovery offset %s".format(partition, recoveryPoint)) + } catch { + case nne: ZkNoNodeException => + ZkUtils.createPersistentPath(zkClient, zkPath, data) + debug("Created path %s with %s for transaction partition recovery".format(zkPath, recoveryPoint)) + case e2: Throwable => throw new AdminOperationException(e2.toString) + } + }} + } + def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) { val zkPath = ZkUtils.ReassignPartitionsPath partitionsToBeReassigned.size match { @@ -708,6 +748,26 @@ object ZkUtils extends Logging { }.flatten.toSet } } + + def getNewTransactionIdBatch(zkClient: ZkClient): Int = { + try { + val stat = zkClient.writeDataReturnStat(transactionIdPath, "", -1) + stat.getVersion + } catch { + case e: ZkNoNodeException => { + createParentPath(zkClient, transactionIdPath) + try { + zkClient.createPersistent(transactionIdPath, "") + } catch { + case e: ZkNodeExistsException => + case e2: Throwable => throw e2 + } + val stat = zkClient.writeDataReturnStat(transactionIdPath, "", -1) + stat.getVersion + } + case e2: Throwable => throw e2 + } + } } object ZKStringSerializer extends ZkSerializer {