diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 6fe7573..48987bb 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -17,9 +17,6 @@ package org.apache.kafka.common.protocol; -import java.util.ArrayList; -import java.util.List; - /** * Identifiers for all the Kafka APIs */ @@ -31,7 +28,10 @@ public enum ApiKeys { LEADER_AND_ISR(4, "leader_and_isr"), STOP_REPLICA(5, "stop_replica"), OFFSET_COMMIT(6, "offset_commit"), - OFFSET_FETCH(7, "offset_fetch"); + OFFSET_FETCH(7, "offset_fetch"), + + TX(11, "transaction"), + TX_COORDINATOR(12, "tx_coordinator"); private static ApiKeys[] codeToType; public static int MAX_API_KEY = -1; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 3374bd9..efc73f5 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge; import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TransactionCoordinatorNotAvailableException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; @@ -35,7 +36,7 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; /** * This class contains all the client-server errors--those errors that must be sent from the server to the client. These * are thus part of the protocol. The names can be changed but the error code cannot. - * + * * Do not add exceptions that occur only on the client or only on the server here. */ public enum Errors { @@ -51,7 +52,8 @@ public enum Errors { // TODO: errorCode 8, 9, 11 MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), - NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")); + NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")), + TRANSACTION_COORDINATOR_NOT_AVAILABLE(17, new TransactionCoordinatorNotAvailableException("There is no tx coordinator available to server transaction requests.")); private static Map, Errors> classToError = new HashMap, Errors>(); private static Map codeToError = new HashMap(); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 044b030..df8642f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -76,8 +76,17 @@ public class Protocol { "Host and port information for all brokers."), new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V0))); + public static Schema TX_COORDINATOR_REQUEST_V0 = new Schema(new Field("groupId", + STRING, + "A user specified identifier for the transaction group")); + + public static Schema TX_COORDINATOR_RESPONSE_V0 = new Schema(new Field("error", INT16, "Error code, if any"), + new Field("broker", BROKER, "The broker information")); + public static Schema[] METADATA_REQUEST = new Schema[] { METADATA_REQUEST_V0 }; public static Schema[] METADATA_RESPONSE = new Schema[] { METADATA_RESPONSE_V0 }; + public static Schema[] TX_COORDINATOR_METADATA_REQUEST = new Schema[] { TX_COORDINATOR_REQUEST_V0 }; + public static Schema[] TX_COORDINATOR_METADATA_RESPONSE = new Schema[] { TX_COORDINATOR_RESPONSE_V0 }; /* Produce api */ @@ -100,10 +109,26 @@ public class Protocol { INT16), new Field("base_offset", INT64)))))))); - public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 }; public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 }; + /* Transaction api */ + + public static Schema TX_REQUEST_V0 = new Schema(new Field("ackTimeoutMs", INT32, "Time to await a response"), + new Field("groupId", STRING, "transaction group Id"), + new Field("txId", INT32, "System generated unique identifier for transaction"), + new Field("txControl", INT16, "Field to indicate type of transaction request"), + new Field("txRequestTimeoutMs", INT32, "Timeout for this transaction (i.e. the transaction should finish within this time)"), + new Field("txPartitions", new ArrayOf(new Schema(new Field("topic", STRING), + new Field("partition", new ArrayOf(INT32)))))); + + public static Schema TX_RESPONSE_V0 = new Schema(new Field("txId", INT32, "System generated unique identifier for transaction"), + new Field("error", INT16, "Identify error")); + + + public static Schema[] TRANSACTION_REQUEST = new Schema[] { TX_REQUEST_V0 }; + public static Schema[] TRANSACTION_RESPONSE = new Schema[] { TX_RESPONSE_V0 }; + /* an array of all requests and responses with all schema versions */ public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][]; public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][]; @@ -120,6 +145,8 @@ public class Protocol { REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; REQUESTS[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {}; REQUESTS[ApiKeys.OFFSET_FETCH.id] = new Schema[] {}; + REQUESTS[ApiKeys.TX.id] = TRANSACTION_REQUEST; + REQUESTS[ApiKeys.TX_COORDINATOR.id] = TX_COORDINATOR_METADATA_REQUEST; RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE; RESPONSES[ApiKeys.FETCH.id] = new Schema[] {}; @@ -129,6 +156,8 @@ public class Protocol { RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; RESPONSES[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {}; RESPONSES[ApiKeys.OFFSET_FETCH.id] = new Schema[] {}; + RESPONSES[ApiKeys.TX.id] = TRANSACTION_RESPONSE; + RESPONSES[ApiKeys.TX_COORDINATOR.id] = TX_COORDINATOR_METADATA_RESPONSE; /* set the maximum version of each api */ for (ApiKeys api : ApiKeys.values()) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java new file mode 100644 index 0000000..969658b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import java.util.Map; + +import org.apache.kafka.clients.consumer.OffsetMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; + +public class OffsetCommitRequest { + + private final String groupId; + private final Map offsets; + + public OffsetCommitRequest(String groupId, Map offsets) { + this.groupId = groupId; + this.offsets = offsets; + } + + public Struct toStruct() { + Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id)); + body.set("groupId", groupId); + body.set("offsets", offsets); + return body; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TransactionCoordinatorMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TransactionCoordinatorMetadataRequest.java new file mode 100644 index 0000000..406dc93 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/TransactionCoordinatorMetadataRequest.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; + +public class TransactionCoordinatorMetadataRequest { + + private final String groupId; + + public TransactionCoordinatorMetadataRequest(String groupId) { + this.groupId = groupId; + } + + public Struct toStruct() { + Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.TX_COORDINATOR.id)); + body.set("groupId", groupId); + return body; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TransactionCoordinatorMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TransactionCoordinatorMetadataResponse.java new file mode 100644 index 0000000..dadd214 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/TransactionCoordinatorMetadataResponse.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + +public class TransactionCoordinatorMetadataResponse { + + private final Node node; + private final Errors error; + + public TransactionCoordinatorMetadataResponse(Node node, Errors error) { + this.node = node; + this.error = error; + } + + public Node getNode(){ + return node; + } + + public Errors getErrors(){ + return error; + } + + public TransactionCoordinatorMetadataResponse(Struct struct) { + short errorCode = struct.getShort("error"); + Errors error = Errors.forCode(errorCode); + Struct nodeStruct = (Struct) struct.get("broker"); + int id = nodeStruct.getInt("node_id"); + String host = nodeStruct.getString("host"); + int port = nodeStruct.getInt("port"); + this.node = new Node(id, host, port); + this.error = error; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TransactionRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TransactionRequest.java new file mode 100644 index 0000000..1b66711 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/TransactionRequest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; + +public class TransactionRequest { + + private final int ackTimeoutMs; + private final String groupId; + private final int txId; + private final short txControl; + private final int txRequestTimeoutMs; + private final Set txPartitions; + + + public TransactionRequest(String groupId, int txId, short txControl, int txTimeoutMs, + int ackTimeoutMs, Set txPartitions) { + this.groupId = groupId; + this.txId = txId; + this.txControl = txControl; + this.txRequestTimeoutMs = txTimeoutMs; + this.ackTimeoutMs = ackTimeoutMs; + this.txPartitions = Collections.unmodifiableSet(txPartitions); + } + + public Struct toStruct() { + Struct txRequest = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.TX.id)); + txRequest.set("ackTimeoutMs", ackTimeoutMs); + txRequest.set("groupId", groupId); + txRequest.set("txId", txId); + txRequest.set("txControl", txControl); + txRequest.set("txRequestTimeoutMs", txRequestTimeoutMs); + + // topicParts.group_by(topic) + Map> partitionsGroupByTopic = new HashMap>(); + for (TopicPartition entry : txPartitions) { + if (partitionsGroupByTopic.containsKey(entry.topic())) { + partitionsGroupByTopic.get(entry.topic()).add(entry.partition()); + } else { + partitionsGroupByTopic.put(entry.topic(), new HashSet()); + partitionsGroupByTopic.get(entry.topic()).add(entry.partition()); + } + } + + List topicPartitionStruct = new ArrayList(partitionsGroupByTopic.size()); + for(String topic : partitionsGroupByTopic.keySet()) { + Struct topicPartition = txRequest.instance("txPartitions"); + topicPartition.set("topic", topic); + topicPartition.set("partition", partitionsGroupByTopic.get(topic).toArray()); + topicPartitionStruct.add(topicPartition); + } + + txRequest.set("txPartitions", topicPartitionStruct.toArray()); + return txRequest; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TransactionResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TransactionResponse.java new file mode 100644 index 0000000..92e8b72 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/TransactionResponse.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; + +public class TransactionResponse { + + private final int txId; + private final Errors error; + + public int getTxId(){ + return txId; + } + + public Errors getError(){ + return error; + } + + public TransactionResponse(int txId, Errors error) { + this.txId = txId; + this.error = error; + } + + public TransactionResponse(Struct struct){ + txId = struct.getInt("txId"); + short errorCode = struct.getShort("error"); + error = Errors.forCode(errorCode); + } + + public Struct toStruct() { + Struct txResponse = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.TX.id)); + txResponse.set("txId", txId); + txResponse.set("error", error); + return txResponse; + } +} diff --git a/config/log4j.properties b/config/log4j.properties index 9502254..9233796 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -63,7 +63,7 @@ log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender log4j.additivity.kafka.network.RequestChannel$=false #log4j.logger.kafka.network.Processor=TRACE, requestAppender -#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender +log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender #log4j.additivity.kafka.server.KafkaApis=false log4j.logger.kafka.request.logger=WARN, requestAppender log4j.additivity.kafka.request.logger=false diff --git a/config/tools-log4j.properties b/config/tools-log4j.properties index 52f07c9..759ae20 100644 --- a/config/tools-log4j.properties +++ b/config/tools-log4j.properties @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=WARN, stdout +log4j.rootLogger=TRACE, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 8d5c2e7..624e37f 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConversions._ import kafka.cluster.Broker import kafka.log.LogConfig import kafka.consumer.Whitelist -import kafka.server.OffsetManager +import kafka.server.{TransactionManager, OffsetManager} object TopicCommand { @@ -106,8 +106,8 @@ object TopicCommand { println("Updated config for topic \"%s\".".format(topic)) } if(opts.options.has(opts.partitionsOpt)) { - if (topic == OffsetManager.OffsetsTopicName) { - throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.") + if (topic == OffsetManager.OffsetsTopicName || topic == TransactionManager.TransactionTopicName) { + throw new IllegalArgumentException("The number of partitions for the " + topic + " topic cannot be changed.") } println("WARNING: If partitions are increased for a topic that has a key, the partition " + "logic or ordering of the messages will be affected") diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index fbfc9d3..0ee6ea0 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -32,6 +32,8 @@ object RequestKeys { val OffsetCommitKey: Short = 8 val OffsetFetchKey: Short = 9 val ConsumerMetadataKey: Short = 10 + val TransactionKey: Short = 11 + val TxCoordinatorMetadataKey: Short = 12 val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= Map(ProduceKey -> ("Produce", ProducerRequest.readFrom), @@ -44,7 +46,9 @@ object RequestKeys { ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom), OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom), OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom), - ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom)) + ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom), + TransactionKey -> ("TransactionRequest", TransactionRequest.readFrom), + TxCoordinatorMetadataKey -> ("TxCoordinatorMetadata", TxCoordinatorMetadataRequest.readFrom)) def nameForKey(key: Short): String = { keyToNameAndDeserializerMap.get(key) match { diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 5559d26..b185421 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -46,6 +46,7 @@ object ErrorMapping { val OffsetsLoadInProgressCode: Short = 14 val ConsumerCoordinatorNotAvailableCode: Short = 15 val NotCoordinatorForConsumerCode: Short = 16 + val TxCoordinatorNotAvailableCode: Short = 17 private val exceptionToCode = Map[Class[Throwable], Short]( diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala index ad75978..91d9052 100644 --- a/core/src/main/scala/kafka/common/Topic.scala +++ b/core/src/main/scala/kafka/common/Topic.scala @@ -18,7 +18,7 @@ package kafka.common import util.matching.Regex -import kafka.server.OffsetManager +import kafka.server.{TransactionManager, OffsetManager} object Topic { @@ -26,7 +26,7 @@ object Topic { private val maxNameLength = 255 private val rgx = new Regex(legalChars + "+") - val InternalTopics = Set(OffsetManager.OffsetsTopicName) + val InternalTopics = Set(OffsetManager.OffsetsTopicName, TransactionManager.TransactionTopicName) def validate(topic: String) { if (topic.length <= 0) diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 1cf2f62..1d41776 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -51,6 +51,7 @@ object ConsumerConfig extends Config { val ExcludeInternalTopics = true val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" val DefaultClientId = "" + val ReadCommitted = false def validate(config: ConsumerConfig) { validateClientId(config.clientId) @@ -175,6 +176,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** Whether messages from internal topics (such as offsets) should be exposed to the consumer. */ val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics) + /** Whether consumer should only return committed messages **/ + val readCommitted = props.getBoolean("read.committed", ReadCommitted) + validate(this) } diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 0e64632..9172d6d 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -18,10 +18,14 @@ package kafka.consumer import kafka.api._ +import kafka.message.{Message, ByteBufferMessageSet} import kafka.network._ import kafka.utils._ import kafka.common.{ErrorMapping, TopicAndPartition} +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + /** * A consumer of kafka messages */ @@ -39,6 +43,15 @@ class SimpleConsumer(val host: String, private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId) private var isClosed = false + /** Keeps pending transaction data in memory **/ + private val pendingTxData = new mutable.HashMap[(String, Int, Int), ListBuffer[Message]] + /** Maintains a list of transactions that can be consumed in order **/ + private val completedTx = new mutable.HashMap[(String, Int), mutable.Queue[ListBuffer[Message]]] + /** Keeps non-transactional messages per topic-partition **/ + private val nonTxMessages = new mutable.HashMap[(String, Int), ListBuffer[Message]] + /** Keeps last consumed offset per topic-partition **/ + private val lastConsumedOffset = new mutable.HashMap[(String, Int), Long]() + private def connect(): BlockingChannel = { close blockingChannel.connect() @@ -98,7 +111,7 @@ class SimpleConsumer(val host: String, } /** - * Fetch a set of messages from a topic. + * Fetch a set of messages from a topic * * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. * @return a set of fetched messages @@ -120,6 +133,198 @@ class SimpleConsumer(val host: String, } /** + * Fetch messages from a topic-partition. It will return all available messages (tx and non-tx) per topic-partition + * @param request + * @return + */ + def fetchTx(request: FetchRequest): Map[(String, Int), (Seq[Message], Long)] = { + var response: Receive = null + var reqBuilder = new FetchRequestBuilder() + var req = reqBuilder.build() + val reqData = new ListBuffer[(String, Int, Long, Int)] + val fetchSize = request.requestInfo.values.head.fetchSize + val availableData: mutable.Map[(String, Int), (Seq[Message], Long)] = new mutable.HashMap[(String, Int), (Seq[Message], Long)] + val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestTimer + val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer + + var continueFetching = true + do { + aggregateTimer.time { + specificTimer.time { + response = sendRequest(request) + } + } + val fetchResponse = FetchResponse.readFrom(response.buffer) + val fetchedSize = fetchResponse.sizeInBytes + fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestSizeHist.update(fetchedSize) + fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize) + + // iterate over byteBufferMessageSet and inspect messages + for (tp <- fetchResponse.data.keys) { + val topic = tp.topic + val partition = tp.partition + var offset: Long = 0 + for (msg <- fetchResponse.messageSet(topic, partition)) { + offset = msg.nextOffset + maybeUpdateLastConsumedOffset(offset, topic, partition) + lastConsumedOffset.put((topic, partition), offset) + val txId = msg.message.txId + debug("Message of transaction: " + txId) + if (txId != -1) { + // handle tx message + val txControl = getTxControl(msg.message.attributes) + debug("txControl: " + txControl) + txControl match { + case TxControl.Abort => + abortTransaction(topic, partition, txId) + case TxControl.Commit => + commitTransaction(topic, partition, txId) + case TxControl.Ongoing => + dataOf(topic, partition, txId, msg.message) + } + } + else { + // handle non-tx message + if(nonTxMessages.contains((topic, partition))) { + nonTxMessages((topic, partition)).append(msg.message) + } else { + val buffer = new ListBuffer[Message]() + buffer.append(msg.message) + nonTxMessages.put((topic, partition), buffer) + } + } + } + // incrementally create a potential next request + reqData.append((topic, partition, offset, fetchSize)) + } + + // check if there is data to return to application and delete from state if so + if(completedTx.size > 0 || nonTxMessages.size > 0) { + continueFetching = false + // populate with nontx messages + for ((topic, partition) <- nonTxMessages.keys) { + val offset = lastConsumedOffset((topic, partition)) + if (availableData.contains((topic, partition))) { + val messages = (availableData((topic, partition))_1) ++ nonTxMessages((topic, partition)) + availableData.put((topic, partition), (messages, offset)) + nonTxMessages.remove((topic, partition)) + } else { + availableData.put((topic, partition), (nonTxMessages((topic, partition)), offset)) + nonTxMessages.remove((topic, partition)) + } + } + // and then with tx data + for ((topic, partition) <- completedTx.keys) { + val offset = lastConsumedOffset((topic, partition)) + if (availableData.contains((topic, partition))) { + var txMsgs = new ListBuffer[Message]() + for(q <- completedTx((topic, partition))) txMsgs = txMsgs ++ q + val messages: Seq[Message] = (availableData((topic, partition))_1) ++ txMsgs + availableData.put((topic, partition), (messages, offset)) + } else { + var txMsgs =new ListBuffer[Message]() + for(q <- completedTx((topic, partition))) + txMsgs = txMsgs ++ q + availableData.put((topic, partition), (txMsgs, offset)) + } + completedTx.remove((topic, partition)) + } + } + + // create new fetchRequest if we continue fetching + if (continueFetching) { + for ((topic, partition, offset, fetchSize) <- reqData) { + reqBuilder = reqBuilder.addFetch(topic, partition, offset, fetchSize) + } + req = reqBuilder.build() + } + } while (continueFetching) + availableData.toMap + } + + /** + * This function updates latest consumed offset if necessary + * @param offset + * @param topic + * @param partition + * @return + */ + private def maybeUpdateLastConsumedOffset(offset: Long, topic: String, partition: Int) = { + if(lastConsumedOffset.contains((topic, partition))) { + if (offset > lastConsumedOffset((topic, partition))) { + lastConsumedOffset.put((topic, partition), offset) + } + } + } + + object TxControl { + val Ongoing: Short = 0 + val Commit: Short = 3 + val Abort: Short = 6 + } + + /** + * This function returns the txControl flag given the message attributes + * @param attr + * @return + */ + private def getTxControl(attr: Short) = { + // apply mask to attr and get correct txControl + val txControlMask = 0x07 << 3 + val txControlOffset = 3 + ((attr & txControlMask) >> txControlOffset).toShort + } + + /** + * This function will abort a transaction, i.e. clean all associated state + * @param topic + * @param partition + * @param txId + * @return + */ + private def abortTransaction(topic: String, partition: Int, txId: Int) = { + trace("Aborting transaction: " + txId) + pendingTxData.remove((topic, partition, txId)) + } + + /** + * This function will commit a transaction, moving its data to completedTx so that it can be returned to the application + * @param topic + * @param partition + * @param txId + * @return + */ + private def commitTransaction(topic: String, partition: Int, txId: Int) = { + trace("Committing transaction: " + txId) + if(completedTx.contains((topic, partition))) { + completedTx((topic, partition)).enqueue(pendingTxData((topic, partition, txId))) + } else { + val msgs = new mutable.Queue[ListBuffer[Message]]() + msgs.enqueue(pendingTxData((topic, partition, txId))) + completedTx.put((topic, partition), msgs) + } + pendingTxData.remove((topic, partition, txId)) + } + + /** + * This method accumulates data of a transaction + * @param topic + * @param partition + * @param txId + * @param message + * @return + */ + private def dataOf(topic: String, partition: Int, txId: Int, message: Message) = { + if(pendingTxData.contains((topic, partition, txId))) { + pendingTxData((topic, partition, txId)).append(message) + } else { + val buffer = new ListBuffer[Message]() + buffer.append(message) + pendingTxData.put((topic, partition, txId), buffer) + } + } + + /** * Get a list of valid offsets (up to maxSize) before the given time. * @param request a [[kafka.api.OffsetRequest]] object. * @return a [[kafka.api.OffsetResponse]] object. diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 8763968..f578961 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -152,6 +152,8 @@ class RequestSendThread(val controllerId: Int, response = StopReplicaResponse.readFrom(receive.buffer) case RequestKeys.UpdateMetadataKey => response = UpdateMetadataResponse.readFrom(receive.buffer) + case RequestKeys.TransactionKey => + response = TransactionResponse.readFrom(receive.buffer) } stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %s" .format(controllerId, controllerContext.epoch, response.correlationId, toBroker.toString())) diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index d2a7293..feb92ba 100644 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -33,9 +33,11 @@ object Message { val CrcLength = 4 val MagicOffset = CrcOffset + CrcLength val MagicLength = 1 + val TxIdLength = 4 val AttributesOffset = MagicOffset + MagicLength val AttributesLength = 1 - val KeySizeOffset = AttributesOffset + AttributesLength + val TxIdOffset = AttributesOffset + AttributesLength + val KeySizeOffset = TxIdOffset + TxIdLength val KeySizeLength = 4 val KeyOffset = KeySizeOffset + KeySizeLength val ValueSizeLength = 4 @@ -46,7 +48,7 @@ object Message { /** * The minimum valid size for the message header */ - val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength + val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength + TxIdLength /** * The current "magic" value @@ -60,6 +62,16 @@ object Message { val CompressionCodeMask: Int = 0x07 /** + * Specifies the mask for the transaction control. 2 bits to hold the compression codec. + * 0 is reserved to indicate no compression + */ + val TransactionControlMask: Int = 0x07 << 3 + + /** + * Specifies the offset for the transaction control within attributes. + */ + val TransactionControlOffset: Int = 3 + /** * Compression code for uncompressed messages */ val NoCompression: Int = 0 @@ -76,6 +88,7 @@ object Message { * 5. K byte key * 6. 4 byte payload length, containing length V * 7. V byte payload + * 8. 4 byte txId * * Default constructor wraps an existing ByteBuffer with the Message object with no change to the contents. */ @@ -91,27 +104,33 @@ class Message(val buffer: ByteBuffer) { * @param payloadOffset The offset into the payload array used to extract payload * @param payloadSize The size of the payload to use */ - def this(bytes: Array[Byte], - key: Array[Byte], - codec: CompressionCodec, - payloadOffset: Int, - payloadSize: Int) = { - this(ByteBuffer.allocate(Message.CrcLength + - Message.MagicLength + - Message.AttributesLength + - Message.KeySizeLength + - (if(key == null) 0 else key.length) + - Message.ValueSizeLength + - (if(bytes == null) 0 - else if(payloadSize >= 0) payloadSize - else bytes.length - payloadOffset))) + def this(bytes: Array[Byte], + key: Array[Byte], + codec: CompressionCodec, + payloadOffset: Int, + payloadSize: Int, + txId: Int, + txControl: Short) = { + this(ByteBuffer.allocate(Message.CrcLength + + Message.MagicLength + + Message.AttributesLength + + Message.KeySizeLength + + (if(key == null) 0 else key.length) + + Message.ValueSizeLength + + (if(bytes == null) 0 + else if(payloadSize >= 0) payloadSize + else bytes.length - payloadOffset) + + Message.TxIdLength)) // skip crc, we will fill that in at the end buffer.position(MagicOffset) buffer.put(CurrentMagicValue) var attributes: Byte = 0 - if (codec.codec > 0) - attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte + if (codec.codec > 0 || txControl > 0) + attributes = (attributes | (CompressionCodeMask & codec.codec) + | (TransactionControlMask & (txControl << TransactionControlOffset))).toByte + buffer.put(attributes) + buffer.putInt(txId) if(key == null) { buffer.putInt(-1) } else { @@ -130,22 +149,25 @@ class Message(val buffer: ByteBuffer) { Utils.writeUnsignedInt(buffer, CrcOffset, computeChecksum) } - def this(bytes: Array[Byte], key: Array[Byte], codec: CompressionCodec) = - this(bytes = bytes, key = key, codec = codec, payloadOffset = 0, payloadSize = -1) + def this(bytes: Array[Byte], key: Array[Byte], codec: CompressionCodec) = + this(bytes = bytes, key = key, codec = codec, payloadOffset = 0, payloadSize = -1, txId = -1, txControl = 0) def this(bytes: Array[Byte], codec: CompressionCodec) = this(bytes = bytes, key = null, codec = codec) - def this(bytes: Array[Byte], key: Array[Byte]) = + def this(bytes: Array[Byte], key: Array[Byte]) = this(bytes = bytes, key = key, codec = NoCompressionCodec) - + def this(bytes: Array[Byte]) = this(bytes = bytes, key = null, codec = NoCompressionCodec) - + + def this(bytes: Array[Byte], key: Array[Byte], txId: Int, txControl: Short) = + this(bytes = bytes, key = key, codec = NoCompressionCodec, payloadOffset = 0, payloadSize = -1, txId = txId, txControl = txControl) + /** * Compute the checksum of the message from the message contents */ - def computeChecksum(): Long = + def computeChecksum(): Long = Utils.crc32(buffer.array, buffer.arrayOffset + MagicOffset, buffer.limit - MagicOffset) /** @@ -211,7 +233,12 @@ class Message(val buffer: ByteBuffer) { */ def compressionCodec: CompressionCodec = CompressionCodec.getCompressionCodec(buffer.get(AttributesOffset) & CompressionCodeMask) - + + /** + * The transaction control used with this message + */ + def txControl: Short = ((buffer.get(AttributesOffset) & TransactionControlMask) >> TransactionControlOffset).toShort + /** * A ByteBuffer containing the content of the message */ @@ -223,6 +250,11 @@ class Message(val buffer: ByteBuffer) { def key: ByteBuffer = sliceDelimited(KeySizeOffset) /** + * A ByteBuffer containing the txId of the message + */ + def txId: Int = buffer.getInt(TxIdOffset) + + /** * Read a size-delimited byte buffer starting at the given offset */ private def sliceDelimited(start: Int): ByteBuffer = { diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala index b020793..d791b1c 100644 --- a/core/src/main/scala/kafka/producer/BaseProducer.scala +++ b/core/src/main/scala/kafka/producer/BaseProducer.scala @@ -17,15 +17,55 @@ package kafka.producer +import org.apache.kafka.clients.consumer.OffsetMetadata; +import org.apache.kafka.common.TopicPartition; + import java.util.Properties // A base producer used whenever we need to have options for both old and new producers; // this class will be removed once we fully rolled out 0.9 trait BaseProducer { + def begin() + def begin(txTimeoutMs: Int) + def commit(offsets: java.util.Map[TopicPartition, OffsetMetadata]) + def abort() def send(topic: String, key: Array[Byte], value: Array[Byte]) def close() } +class NewTxProducer(producerProps: Properties) extends BaseProducer { + import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} + import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback + + val producer = new KafkaProducer(producerProps) + + override def send(topic: String, key: Array[Byte], value: Array[Byte]) { + val record = new ProducerRecord(topic, key, value) + this.producer.send(record, + new ErrorLoggingCallback(topic, key, value, false)) + } + + override def begin() { + this.producer.begin() + } + + override def begin(txTimeoutMs: Int) { + this.producer.begin(txTimeoutMs) + } + + override def abort() { + this.producer.abort() + } + + override def commit(offsets: java.util.Map[TopicPartition, OffsetMetadata]) { + this.producer.commit(offsets) + } + + override def close() { + this.producer.close() + } +} + class NewShinyProducer(producerProps: Properties) extends BaseProducer { import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback @@ -45,6 +85,22 @@ class NewShinyProducer(producerProps: Properties) extends BaseProducer { } } + override def begin() { + + } + + override def begin(txTimeoutMs: Int) { + + } + + override def abort() { + + } + + override def commit(offsets: java.util.Map[TopicPartition, OffsetMetadata]) { + + } + override def close() { this.producer.close() } @@ -62,6 +118,22 @@ class OldProducer(producerProps: Properties) extends BaseProducer { this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topic, key, value)) } + override def begin() { + + } + + override def begin(txTimeoutMs: Int) { + + } + + override def abort() { + + } + + override def commit(offsets: java.util.Map[TopicPartition, OffsetMetadata]) { + + } + override def close() { this.producer.close() } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0b668f2..c0f5177 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -18,6 +18,7 @@ package kafka.server import kafka.api._ +import kafka.cluster.Broker import kafka.common._ import kafka.log._ import kafka.message._ @@ -27,24 +28,26 @@ import kafka.metrics.KafkaMetricsGroup import kafka.network.RequestChannel.Response import kafka.controller.KafkaController import kafka.utils.{Pool, SystemTime, Logging} - -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic._ import scala.collection._ - import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.common.KafkaException /** + * * Logic to handle the various Kafka requests */ class KafkaApis(val requestChannel: RequestChannel, val replicaManager: ReplicaManager, val offsetManager: OffsetManager, + val transactionManager: TransactionManager, val zkClient: ZkClient, val brokerId: Int, val config: KafkaConfig, val controller: KafkaController) extends Logging { + private val commitRequestPurgatory = new CommitRequestPurgatory private val producerRequestPurgatory = new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests) private val fetchRequestPurgatory = @@ -71,6 +74,8 @@ class KafkaApis(val requestChannel: RequestChannel, case RequestKeys.OffsetCommitKey => handleProducerOrOffsetCommitRequest(request) case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request) + case RequestKeys.TransactionKey => handleTransactionRequest(request) + case RequestKeys.TxCoordinatorMetadataKey => handleTxCoordinatorMetadataRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { @@ -143,6 +148,58 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def producerRequestFromTxRequestToBroker(txRequest: TransactionRequest): ProducerRequest = { + val byteBufferMessageSet =new ByteBufferMessageSet( + config.transactionsTopicCompressionCodec, + new Message( + bytes = TransactionManager.transactionControlValue(txRequest.requestInfo), + key = Array.empty[Byte], + txId = txRequest.requestInfo.txId, + txControl = txRequest.requestInfo.txControl)) + + val localPartitions = txRequest.requestInfo.txPartitions.filter(topicAndPartition => { + val partitionInfoOpt = metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition) + val localBrokerId = replicaManager.config.brokerId + partitionInfoOpt match { + case Some(partitionInfo) => partitionInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == localBrokerId + case None => false + }}) + + val producerData = mutable.Map(localPartitions.map(topicAndPartition => topicAndPartition -> byteBufferMessageSet): _*) + + val request = ProducerRequest( + correlationId = txRequest.correlationId, + clientId = txRequest.clientId, + requiredAcks = -1, + ackTimeoutMs = txRequest.ackTimeoutMs, + data = producerData) + trace("Created producer request %s for transaction request %s.".format(request, txRequest)) + request + } + + private def producerRequestFromTxRequestToCoordinator(txRequest: TransactionRequest) = { + val message = new Message( + bytes = TransactionManager.transactionControlValue(txRequest.requestInfo), + key = Array.empty[Byte], + txId = txRequest.requestInfo.txId, + txControl = txRequest.requestInfo.txControl) + + val partition = transactionManager.partitionFor(txRequest.requestInfo.txGroupId) + + val producerData = mutable.Map( + TopicAndPartition(TransactionManager.TransactionTopicName, partition) -> + new ByteBufferMessageSet(config.transactionsTopicCompressionCodec, message)) + + val request = ProducerRequest( + correlationId = txRequest.correlationId, + clientId = txRequest.clientId, + requiredAcks = -1, + ackTimeoutMs = txRequest.ackTimeoutMs, + data = producerData) + trace("Created producer request %s for transaction request %s.".format(request, txRequest)) + request + } + private def producerRequestFromOffsetCommit(offsetCommitRequest: OffsetCommitRequest) = { val msgs = offsetCommitRequest.filterLargeMetadata(config.offsetMetadataMaxSize).map { case (topicAndPartition, offset) => @@ -167,6 +224,83 @@ class KafkaApis(val requestChannel: RequestChannel, request } + def sendTxRequestToBrokers(txRequest: TransactionRequest) { + val partitionInfoOpts = txRequest.requestInfo.txPartitions.map ( topicAndPartition => + metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition)) + val brokers = partitionInfoOpts.filter(_ != None).map(_.get.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet + brokers.foreach(broker => + transactionManager.sendRequest(broker, txRequest, (response: RequestOrResponse) => commitRequestPurgatory.update(response)) + ) + } + +/** + * 1. on receiving CoordinatorBegin message + * - append to transactionTopic + * - wait for ack from followers, and respond + * + * 2. on receiving PreCommit message + * - append to transactionTopic + * - wait for ack from followers, and respond + * - send Commit message to brokers + * - wait for ack from brokers, and append Committed to transactionTopic + * + * 3. on receiving Commit message + * - append to topicPartition + * - wait for ack from followers, and respond + * + * 4. on receiving Abort message + * - append to transactionTopic + * - wait for ack from followers, and resp + */ + + def handleTransactionRequest(request: RequestChannel.Request) { + var txRequest = request.requestObj.asInstanceOf[TransactionRequest] + val txControl = txRequest.requestInfo.txControl + if (txControl == TxControlTypes.Begin) { + val newTxId = transactionManager.getNextTransactionId + txRequest = txRequest.copy(requestInfo = txRequest.requestInfo.copy(txId = newTxId)) + } + val producerRequest = txControl match { + case TxControlTypes.Begin | TxControlTypes.PreCommit | TxControlTypes.PreAbort => + producerRequestFromTxRequestToCoordinator(txRequest) + case TxControlTypes.Commit | TxControlTypes.Abort => + producerRequestFromTxRequestToBroker(txRequest) + case _ => throw new KafkaException("Unhandled Transaction Control Key %i".format(txControl)) + } + val localProduceResults = appendToLocalLog(producerRequest) + + // create a list of (topic, partition) pairs to use as keys for this delayed request + val producerRequestKeys = producerRequest.data.keys.map( + topicAndPartition => new RequestKey(topicAndPartition)).toSeq + val statuses = localProduceResults.map(r => + r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap + val delayedRequest = new DelayedProduce( + producerRequestKeys, + request, + statuses, + producerRequest, + producerRequest.ackTimeoutMs.toLong, + None, + Some(txRequest)) + + producerRequestPurgatory.watch(delayedRequest) + + /* + * Replica fetch requests may have arrived (and potentially satisfied) + * delayedProduce requests while they were being added to the purgatory. + * Here, we explicitly check if any of them can be satisfied. + */ + var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] + producerRequestKeys.foreach(key => + satisfiedProduceRequests ++= + producerRequestPurgatory.update(key, key)) + debug(satisfiedProduceRequests.size + + " producer requests unblocked during produce to local log.") + satisfiedProduceRequests.foreach(_.respond()) + // we do not need the data anymore + producerRequest.emptyData() +} + /** * Handle a produce request or offset commit request (which is really a specialized producer request) */ @@ -238,7 +372,8 @@ class KafkaApis(val requestChannel: RequestChannel, statuses, produceRequest, produceRequest.ackTimeoutMs.toLong, - offsetCommitRequestOpt) + offsetCommitRequestOpt, + None) producerRequestPurgatory.watch(delayedRequest) @@ -559,7 +694,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (topics.size > 0 && topicResponses.size != topics.size) { val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet val responsesForNonExistentTopics = nonExistentTopics.map { topic => - if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) { + if (topic == OffsetManager.OffsetsTopicName || topic == TransactionManager.TransactionTopicName || config.autoCreateTopicsEnable) { try { if (topic == OffsetManager.OffsetsTopicName) { AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, @@ -567,6 +702,11 @@ class KafkaApis(val requestChannel: RequestChannel, info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" .format(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor)) } + else if (topic == TransactionManager.TransactionTopicName) { + AdminUtils.createTopic(zkClient, topic, config.transactionsTopicPartitions, config.transactionsTopicReplicationFactor) + info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" + .format(topic, config.transactionsTopicPartitions, config.transactionsTopicReplicationFactor)) + } else { AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" @@ -611,6 +751,28 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } + + def handleTxCoordinatorMetadataRequest(request: RequestChannel.Request) { + val txCoordinatorMetadataRequest = request.requestObj.asInstanceOf[TxCoordinatorMetadataRequest] + val partition = transactionManager.partitionFor(txCoordinatorMetadataRequest.txGroupId) + // get metadata (and create the topic if necessary) + val transactionTopicMetadata = getTopicMetadata(Set(TransactionManager.TransactionTopicName)).head + + val errorResponse = TxCoordinatorMetadataResponse(None, ErrorMapping.TxCoordinatorNotAvailableCode, txCoordinatorMetadataRequest.correlationId) + + val response = + transactionTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata => + partitionMetadata.leader.map { leader => + TxCoordinatorMetadataResponse(Some(leader), ErrorMapping.NoError, txCoordinatorMetadataRequest.correlationId) + }.getOrElse(errorResponse) + }.getOrElse(errorResponse) + + trace("Sending transaction metadata %s for correlation id %d to client %s." + .format(response, txCoordinatorMetadataRequest.correlationId, txCoordinatorMetadataRequest.clientId)) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } + + /* * Service the consumer metadata API */ @@ -704,12 +866,63 @@ class KafkaApis(val requestChannel: RequestChannel, } } + class DelayedCommit(val txRequest: TransactionRequest) { + val satisfied = new AtomicBoolean(false) + val partitions = new mutable.HashSet[TopicAndPartition] + partitions ++= txRequest.requestInfo.txPartitions + + def checkSatisfied(ackedPartitions: Seq[TopicAndPartition]) = { + this synchronized { + partitions --= ackedPartitions + partitions.isEmpty + } + } + + def respond() { + val txRequestToLog = txRequest.requestInfo.txControl match { + case TxControlTypes.Commit => + TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.Committed) + case TxControlTypes.Abort => + TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.Aborted) + } + + val producerRequest = producerRequestFromTxRequestToCoordinator(txRequestToLog) + appendToLocalLog(producerRequest) + transactionManager.checkpointTransactionHW(txRequest) + } + } + + class CommitRequestPurgatory() { + private val commitRequestMap = new ConcurrentHashMap[Int, DelayedCommit]() + + def watch(txId: Int, delayed: DelayedCommit) { + assert(!commitRequestMap.contains(txId)) + commitRequestMap.put(txId, delayed) + } + + def update(response: RequestOrResponse) { + val txResponse = response.asInstanceOf[TransactionResponse] + val partitions = txResponse.status.filter(_._2 == ErrorMapping.NoError).keys.toSeq + val delayedCommit = commitRequestMap.get(txResponse.txId) + + if (delayedCommit != null) { + if (delayedCommit.checkSatisfied(partitions)) { + if (delayedCommit.satisfied.compareAndSet(false, true)) + delayedCommit.respond() + } + } else { + info("txId %i not found in commitRequestMap".format(txResponse.txId)) + } + } + } + class DelayedProduce(keys: Seq[RequestKey], request: RequestChannel.Request, val partitionStatus: immutable.Map[TopicAndPartition, DelayedProduceResponseStatus], produce: ProducerRequest, delayMs: Long, - offsetCommitRequestOpt: Option[OffsetCommitRequest] = None) + offsetCommitRequestOpt: Option[OffsetCommitRequest] = None, + transactionRequestOpt : Option[TransactionRequest] = None) extends DelayedRequest(keys, request, delayMs) with Logging { // first update the acks pending variable according to the error code @@ -725,6 +938,12 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus)) } + def watchAndSendRequests(txRequest: TransactionRequest) { + val delayedCommit = new DelayedCommit(txRequest) + commitRequestPurgatory.watch(txRequest.requestInfo.txId, delayedCommit) + sendTxRequestToBrokers(txRequest) + } + def respond() { val responseStatus = partitionStatus.map { case (topicAndPartition, delayedStatus) => topicAndPartition -> delayedStatus.status @@ -735,14 +954,32 @@ class KafkaApis(val requestChannel: RequestChannel, }.map(_._2.error).getOrElse(ErrorMapping.NoError) if (errorCode == ErrorMapping.NoError) { - offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) + offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo)) + transactionRequestOpt.foreach(txRequest => + partitionStatus.foreach{ case (topicAndPartition, delayedStatus) => + transactionManager.recordPendingRequest(txRequest, delayedStatus.requiredOffset)}) } - val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, config.offsetMetadataMaxSize)) - .getOrElse(ProducerResponse(produce.correlationId, responseStatus)) + val response: RequestOrResponse = + if (offsetCommitRequestOpt != None) offsetCommitRequestOpt.get.responseFor(errorCode, config.offsetMetadataMaxSize) + else if (transactionRequestOpt != None) transactionRequestOpt.get.responseFor(responseStatus.mapValues(_.error)) + else ProducerResponse(produce.correlationId, responseStatus) requestChannel.sendResponse(new RequestChannel.Response( request, new BoundedByteBufferSend(response))) + + // For preCommit/preAbort, forward commit/abort to partitions involved in the transaction + if (errorCode == ErrorMapping.NoError) { + transactionRequestOpt.foreach(txRequest => txRequest.requestInfo.txControl match { + case TxControlTypes.PreCommit => + val newTxRequest = TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.Commit) + watchAndSendRequests(newTxRequest) + case TxControlTypes.PreAbort => + val newTxRequest = TransactionRequest.TransactionWithNewControl(txRequest, TxControlTypes.Abort) + watchAndSendRequests(newTxRequest) + case _ => + } + )} } /** diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ef75b67..1e32707 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -291,4 +291,20 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off */ val deleteTopicEnable = props.getBoolean("delete.topic.enable", false) + /*********** Transaction management configuration ***********/ + + /** The number of partitions for the offset commit topic (should not change after deployment). */ + val transactionsTopicPartitions: Int = props.getIntInRange("transaction.topic.num.partitions", + TransactionManagerConfig.DefaultTransactionsTopicNumPartitions, (1, Integer.MAX_VALUE)) + + /** The replication factor for the transaction topic (set higher to ensure availability). */ + val transactionsTopicReplicationFactor: Short = props.getShortInRange("transaction.topic.replication.factor", + TransactionManagerConfig.DefaultTransactionsTopicReplicationFactor, (1, Short.MaxValue)) + + /** Compression codec for the transaction topic. */ + val transactionsTopicCompressionCodec = props.getCompressionCodec("transaction.topic.compression.codec", + TransactionManagerConfig.DefaultTransactionsTopicCompressionCodec) + + /* the frequency with which we update the persistent record of the last pending txControl which acts as the transaction recovery point */ + val transactionFlushOffsetCheckpointIntervalMs = props.getIntInRange("transaction.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue)) } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c22e51e..3c5022d 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -49,6 +49,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var requestHandlerPool: KafkaRequestHandlerPool = null var logManager: LogManager = null var offsetManager: OffsetManager = null + var transactionManager: TransactionManager = null var kafkaHealthcheck: KafkaHealthcheck = null var topicConfigManager: TopicConfigManager = null var replicaManager: ReplicaManager = null @@ -100,9 +101,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg offsetManager = createOffsetManager() kafkaController = new KafkaController(config, zkClient, brokerState) + + /* start transaction manager */ + transactionManager = createTransactionManager() + + transactionManager.startup() /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) + apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, transactionManager, + zkClient, config.brokerId, config, kafkaController) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) @@ -256,6 +263,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Utils.swallow(logManager.shutdown()) if(kafkaController != null) Utils.swallow(kafkaController.shutdown()) + if(transactionManager != null) + Utils.swallow(transactionManager.shutdown()) if(zkClient != null) Utils.swallow(zkClient.close()) @@ -310,6 +319,16 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg time = time) } + private def createTransactionManager(): TransactionManager = { + val transactionConfig = TransactionManagerConfig( + transactionsTopicNumPartitions = config.transactionsTopicPartitions) + new TransactionManager(config = transactionConfig, + controller = kafkaController, + scheduler = kafkaScheduler, + flushCheckpointMs = config.transactionFlushOffsetCheckpointIntervalMs, + zkClient = zkClient) + } + private def createOffsetManager(): OffsetManager = { val offsetManagerConfig = OffsetManagerConfig( maxMetadataSize = config.offsetMetadataMaxSize, diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 6daf87b..54f6093 100644 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -141,8 +141,8 @@ object DumpLogSegments { lastOffset = messageAndOffset.offset print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid + - " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + - " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum) + " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + " txId: " + msg.txId + + " txControl: " + msg.txControl + " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum) if(msg.hasKey) print(" keysize: " + msg.keySize) if(printContents) { diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala index 7602b8d..0ff3d06 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala @@ -59,27 +59,45 @@ object SimpleConsumerPerformance { var lastReportTime: Long = startMs var lastBytesRead = 0L var lastMessagesRead = 0L + if(! config.readCommitted) + println("Simple Consumer configured to READ UNCOMMITTED") + else + println("Simple Consumer configured to READ COMMITTED") + while(!done) { // TODO: add in the maxWait and minBytes for performance val request = new FetchRequestBuilder() .clientId(config.clientId) .addFetch(config.topic, config.partition, offset, config.fetchSize) .build() - val fetchResponse = consumer.fetch(request) var messagesRead = 0 var bytesRead = 0 - val messageSet = fetchResponse.messageSet(config.topic, config.partition) - for (message <- messageSet) { - messagesRead += 1 - bytesRead += message.message.payloadSize - } - - if(messagesRead == 0 || totalMessagesRead > config.numMessages) - done = true - else + if(! config.readCommitted) { + val fetchResponse = consumer.fetch(request) + val messageSet = fetchResponse.messageSet(config.topic, config.partition) + for (message <- messageSet) { + messagesRead += 1 + bytesRead += message.message.payloadSize + } + if(messagesRead == 0 || totalMessagesRead > config.numMessages) + done = true + else // we only did one fetch so we find the offset for the first (head) messageset - offset += messageSet.validBytes + offset += messageSet.validBytes + } else if (config.readCommitted) { + val fetchTxResponse = consumer.fetchTx(request) + val (messages, lastConsumedOffset) = fetchTxResponse((config.topic, config.partition)) + println("Got "+messages.size+" messages from "+config.topic+" - "+config.partition+" with last offset: "+lastConsumedOffset) + for (message <- messages) { + messagesRead += 1 + bytesRead += message.payloadSize + } + if(messagesRead == 0 || totalMessagesRead > config.numMessages) + done = true + else + offset = lastConsumedOffset + } totalBytesRead += bytesRead totalMessagesRead += messagesRead @@ -154,5 +172,6 @@ object SimpleConsumerPerformance { val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) val hideHeader = options.has(hideHeaderOpt) val clientId = options.valueOf(clientIdOpt).toString + val readCommitted = true } } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index dcdc1ce..f7e4073 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -46,6 +46,8 @@ object ZkUtils extends Logging { val ReassignPartitionsPath = "/admin/reassign_partitions" val DeleteTopicsPath = "/admin/delete_topics" val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" + val transactionIdPath = "/transaction/ids" + val transactionRecoveryPath = "/transaction/recovery_offsets" def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic @@ -61,6 +63,9 @@ object ZkUtils extends Logging { def getDeleteTopicPath(topic: String): String = DeleteTopicsPath + "/" + topic + def getTransactionPartitionRecoveryPath(partition: Int): String = + transactionRecoveryPath + "/" + partition + def getController(zkClient: ZkClient): Int = { readDataMaybeNull(zkClient, ControllerPath)._1 match { case Some(controller) => KafkaController.parseControllerId(controller) @@ -617,6 +622,41 @@ object ZkUtils extends Logging { "replicas" -> e._2)))) } + def getTransactionRecoveryOffset(zkClient: ZkClient): immutable.Map[Int, Long] = { + val partitions = try { + getChildren(zkClient, transactionRecoveryPath) + } catch { + case nne: ZkNoNodeException => + zkClient.createPersistent(transactionRecoveryPath, true) + debug("Created path %s for transaction partition recovery".format(transactionRecoveryPath)) + Seq.empty[String] + case e2: Throwable => throw new AdminOperationException(e2.toString) + } + + val recoveryPoints = partitions.map { partition => { + val zkPath = getTransactionPartitionRecoveryPath(partition.toInt) + val data = readData(zkClient, zkPath)._1 + (partition.toInt, data.toLong) + }}.toMap + recoveryPoints + } + + def updateTransactionRecoveryOffset(zkClient: ZkClient, recoveryPoints: mutable.Map[Int, Long]) { + recoveryPoints.foreach{ case (partition: Int, recoveryPoint: Long) => { + val zkPath = getTransactionPartitionRecoveryPath(partition) + val data = recoveryPoint.toString + try { + updatePersistentPath(zkClient, zkPath, data) + info("Updated transaction partition %s with recovery offset %s".format(partition, recoveryPoint)) + } catch { + case nne: ZkNoNodeException => + ZkUtils.createPersistentPath(zkClient, zkPath, data) + debug("Created path %s with %s for transaction partition recovery".format(zkPath, recoveryPoint)) + case e2: Throwable => throw new AdminOperationException(e2.toString) + } + }} + } + def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) { val zkPath = ZkUtils.ReassignPartitionsPath partitionsToBeReassigned.size match { @@ -708,6 +748,26 @@ object ZkUtils extends Logging { }.flatten.toSet } } + + def getNewTransactionIdBatch(zkClient: ZkClient): Int = { + try { + val stat = zkClient.writeDataReturnStat(transactionIdPath, "", -1) + stat.getVersion + } catch { + case e: ZkNoNodeException => { + createParentPath(zkClient, transactionIdPath) + try { + zkClient.createPersistent(transactionIdPath, "") + } catch { + case e: ZkNodeExistsException => + case e2: Throwable => throw e2 + } + val stat = zkClient.writeDataReturnStat(transactionIdPath, "", -1) + stat.getVersion + } + case e2: Throwable => throw e2 + } + } } object ZKStringSerializer extends ZkSerializer { diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index a2117b3..5eb40a8 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -180,6 +180,25 @@ object SerializationTestUtils { ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError) } + def createTestTransactionRequest: TransactionRequest = { + new TransactionRequest(1, "client 1", 1000, + TransactionRequestInfo("group 1", 1, 1, 1000, Seq( + TopicAndPartition(topic1, 0), TopicAndPartition(topic2, 0) + ))) + } + + def createTestTransactionResponse: TransactionResponse = { + TransactionResponse(1, 1, 0.toShort) + } + + def createTestTxCoordinatorMetadataRequest: TxCoordinatorMetadataRequest = { + TxCoordinatorMetadataRequest("txGroup 1", clientId = "client 1") + } + + def createTestTxCoordinatorMetadataResponse: TxCoordinatorMetadataResponse = { + TxCoordinatorMetadataResponse(Some(brokers.head), ErrorMapping.NoError) + } + } class RequestResponseSerializationTest extends JUnitSuite { @@ -201,6 +220,10 @@ class RequestResponseSerializationTest extends JUnitSuite { private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode) + private val transactionRequest = SerializationTestUtils.createTestTransactionRequest + private val transactionResponse = SerializationTestUtils.createTestTransactionResponse + private val txCoordinatorMetadataRequest = SerializationTestUtils.createTestTxCoordinatorMetadataRequest + private val txCoordinatorMetadataResponse = SerializationTestUtils.createTestTxCoordinatorMetadataResponse @Test def testSerializationAndDeserialization() { @@ -214,7 +237,9 @@ class RequestResponseSerializationTest extends JUnitSuite { topicMetadataRequest, topicMetadataResponse, offsetCommitRequest, offsetCommitResponse, offsetFetchRequest, offsetFetchResponse, - consumerMetadataRequest, consumerMetadataResponse, consumerMetadataResponseNoCoordinator) + consumerMetadataRequest, consumerMetadataResponse, consumerMetadataResponseNoCoordinator, + transactionRequest, transactionResponse, + txCoordinatorMetadataRequest, txCoordinatorMetadataResponse) requestsAndResponses.foreach { original => val buffer = ByteBuffer.allocate(original.sizeInBytes)