From cbe0d244a984a8529f071383f5a6af1d3bad5ecd Mon Sep 17 00:00:00 2001 From: raulcf Date: Wed, 20 Aug 2014 09:14:00 -0700 Subject: [PATCH] KAFKA-1524; transactional producer, rolling back small change after review that made it break --- .../org/apache/kafka/clients/NetworkClient.java | 187 ++++++++++- .../producer/AbortTransactionException.java | 33 ++ .../InvalidTransactionStatusException.java | 34 ++ .../kafka/clients/producer/KafkaProducer.java | 188 ++++++++++- .../kafka/clients/producer/MockProducer.java | 31 +- .../apache/kafka/clients/producer/Producer.java | 24 +- .../kafka/clients/producer/ProducerConfig.java | 15 +- .../producer/internals/RecordAccumulator.java | 37 ++- .../clients/producer/internals/RecordBatch.java | 16 +- .../kafka/clients/producer/internals/Sender.java | 1 + .../producer/internals/TransactionContext.java | 364 +++++++++++++++++++++ .../producer/internals/TransactionControl.java | 33 ++ ...ransactionCoordinatorNotAvailableException.java | 37 +++ .../common/errors/TransactionFailedException.java | 37 +++ .../org/apache/kafka/common/record/Compressor.java | 19 +- .../apache/kafka/common/record/MemoryRecords.java | 14 +- .../org/apache/kafka/common/record/Record.java | 20 +- .../clients/producer/RecordAccumulatorTest.java | 16 +- .../apache/kafka/clients/producer/SenderTest.java | 10 +- .../clients/producer/TransactionContextTest.java | 84 +++++ .../kafka/common/record/MemoryRecordsTest.java | 3 +- .../org/apache/kafka/common/record/RecordTest.java | 2 +- 22 files changed, 1114 insertions(+), 91 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/AbortTransactionException.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/producer/TransactionContextTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index eea270a..8244ab9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -16,12 +16,17 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; +import org.apache.kafka.clients.consumer.OffsetMetadata; +import org.apache.kafka.clients.producer.InvalidTransactionStatusException; import org.apache.kafka.clients.producer.internals.Metadata; +import org.apache.kafka.clients.producer.internals.TransactionContext; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.Selectable; @@ -30,9 +35,14 @@ import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.requests.TransactionCoordinatorMetadataRequest; +import org.apache.kafka.common.requests.TransactionCoordinatorMetadataResponse; +import org.apache.kafka.common.requests.TransactionRequest; +import org.apache.kafka.common.requests.TransactionResponse; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +63,9 @@ public class NetworkClient implements KafkaClient { /* the current cluster metadata */ private final Metadata metadata; + /* the transaction context */ + private TransactionContext txContext; + /* the state of each node's connection */ private final ClusterConnectionStates connectionStates; @@ -100,6 +113,18 @@ public class NetworkClient implements KafkaClient { this.lastNoNodeAvailableMs = 0; } + public NetworkClient(Selectable selector, + Metadata metadata, + String clientId, + int maxInFlightRequestsPerConnection, + long reconnectBackoffMs, + int socketSendBuffer, + int socketReceiveBuffer, + TransactionContext txContext) { + this(selector, metadata, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer); + this.txContext = txContext; + } + /** * Begin connecting to the given node, return true if we are already connected and ready to send to that node. * @param node The node to check @@ -155,12 +180,17 @@ public class NetworkClient implements KafkaClient { public List poll(List requests, long timeout, long now) { List sends = new ArrayList(); + maybeUpdateTxCoordinatorMetadata(sends, now); + + // check if there are pending transaction control messages + maybeSendTxControlData(sends, now, timeout); + for (int i = 0; i < requests.size(); i++) { ClientRequest request = requests.get(i); int nodeId = request.request().destination(); - if (!isSendable(nodeId)) + if (!isSendable(nodeId)) { throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); - + } this.inFlightRequests.add(request); sends.add(request.request()); } @@ -177,7 +207,10 @@ public class NetworkClient implements KafkaClient { try { this.selector.poll(Math.min(timeout, metadataTimeout), sends); } catch (IOException e) { - log.error("Unexpected error during I/O in producer network thread", e); + if (txContext != null) { + txContext.maybeAbortTransaction(); + } + log.error("Unexpected error during I/O in producer network thread", e); } List responses = new ArrayList(); @@ -280,8 +313,13 @@ public class NetworkClient implements KafkaClient { short apiKey = req.request().header().apiKey(); Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); correlate(req.request().header(), header); + log.trace("apiKEY: {}", apiKey); if (apiKey == ApiKeys.METADATA.id) { handleMetadataResponse(req.request().header(), body, now); + } else if(apiKey == ApiKeys.TX_COORDINATOR.id) { + handleTxCoordinatorMetadataResponse(req.request().header(), body); + } else if(apiKey == ApiKeys.TX.id) { + handleTxResponse(body); } else { // need to add body/header to response here responses.add(new ClientResponse(req, now, false, body)); @@ -302,6 +340,35 @@ public class NetworkClient implements KafkaClient { } /** + * Handle tx coordinator metadata request + * @param header The response header + * @param body The txCoordinator metadata body + * @param now The current time + */ + private void handleTxCoordinatorMetadataResponse(RequestHeader header, Struct body) { + this.metadataFetchInProgress = false; + TransactionCoordinatorMetadataResponse response = new TransactionCoordinatorMetadataResponse(body); + Node nInfo = response.getNode(); + if(nInfo.host().equals("") || nInfo.port() == -1 || nInfo.id() == -1){ // check node is valid + log.trace("Ignoring empty coordinator node response response, correlation id {}.", header.correlationId()); + this.txContext.doUpdateTxCoordinator(); + } else { + this.txContext.setTxCoordinatorNode(nInfo); + } + } + + /** + * Handle transaction response + * @param header The response header + * @param body The txResponse body + * @param now The current time + */ + private void handleTxResponse(Struct body){ + TransactionResponse response = new TransactionResponse(body); + this.txContext.txResponseAvailable(response); + } + + /** * Handle any disconnected connections * @param responses The list of responses that completed with the disconnection * @param now The current time @@ -338,11 +405,38 @@ public class NetworkClient implements KafkaClient { * Validate that the response corresponds to the request we expect or else explode */ private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) { - if (requestHeader.correlationId() != responseHeader.correlationId()) - throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() + - ") does not match request (" + - requestHeader.correlationId() + - ")"); + if (requestHeader.correlationId() != responseHeader.correlationId()) { + try { + if (txContext != null) { + txContext.maybeAbortTransaction(); + } + } catch (InvalidTransactionStatusException itse) { + log.error("Invalid TX status: "+itse.getMessage()); + } finally { + throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() + + ") does not match request (" + + requestHeader.correlationId() + + ")"); + } + } + } + + /** + * Create a transaction coordinator metadata request + */ + private ClientRequest txCoordinatorMetadataRequest(long now, int node, String producerGroupId){ + TransactionCoordinatorMetadataRequest metadata = new TransactionCoordinatorMetadataRequest(producerGroupId); + RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.TX_COORDINATOR), metadata.toStruct()); + return new ClientRequest(now, true, send, null); + } + + /** + * Creat a offset commit request + */ + private ClientRequest offsetCommitRequest(long now, int node, Map txOffsets){ + OffsetCommitRequest ocr = new OffsetCommitRequest("0", 0, "0", txOffsets); + RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.OFFSET_COMMIT), ocr.toStruct()); + return new ClientRequest(now, true, send, null); } /** @@ -381,6 +475,79 @@ public class NetworkClient implements KafkaClient { } } + private void maybeSendMetadata(List sends, ClientRequest metadataRequest, long now) { + Node node = this.leastLoadedNode(now); + if (node == null) { + log.debug("Give up sending metadata request since no node is available"); + // mark the timestamp for no node available to connect + this.lastNoNodeAvailableMs = now; + return; + } + log.debug("Trying to send clientRequest to node {}", node.id()); + if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { + sends.add(metadataRequest.request()); + this.inFlightRequests.add(metadataRequest); + } else if (connectionStates.canConnect(node.id(), now)){ + log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id()); + initiateConnect(node, now); + } + } + + /** + * Add a txCoordinator metadata request to the list of sends if we need to make one + * @param sends + * @param now + */ + private void maybeUpdateTxCoordinatorMetadata(List sends, long now) { + if(txContext != null) { + Node node = this.leastLoadedNode(now); + if (node == null) + return; + if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { + // Check if it is necessary to update tx Coordinator + if (txContext.needToUpdateTxCoordinator()) { + ClientRequest txCoordinatorMetadataRequest = txCoordinatorMetadataRequest(now, node.id(), txContext.getTxGroupId()); + log.debug("Sending txCoordinatorMetadata request {} to node {}", txCoordinatorMetadataRequest, node.id()); + sends.add(txCoordinatorMetadataRequest.request()); + this.inFlightRequests.add(txCoordinatorMetadataRequest); + return; // already included one request + } + + // Check if it is necessary to commit offsets + if (txContext.getTxOffsetsAvailable() != null) { + ClientRequest offsetCommitRequest = offsetCommitRequest(now, node.id(), txContext.getTxOffsets()); + log.debug("Sending offset commit request {} to node {}", offsetCommitRequest, node.id()); + sends.add(offsetCommitRequest.request()); + this.inFlightRequests.add(offsetCommitRequest); + return; + } + } else if (connectionStates.canConnect(node.id(), now)) { + // we don't have a connection to this node right now, make one + initiateConnect(node, now); + } + } + } + + /** + * Add a transaction request to the list of sends if there is some available + */ + private void maybeSendTxControlData(List sends, long now, long timeout){ + if (txContext != null) { + TransactionRequest tr; + if ((tr = txContext.getTxRequest()) != null) { + txContext.ensureTxCoordinatorInfoIsAvailable(timeout); + int nodeId = txContext.getTxCoordinatorNode().id(); + if (!connectionStates.isConnected(nodeId)) + initiateConnect(txContext.getTxCoordinatorNode(), now); + RequestSend send = new RequestSend(nodeId, nextRequestHeader(ApiKeys.TX), tr.toStruct()); + ClientRequest cr = new ClientRequest(now, true, send, null); + log.debug("Sending tx control: {} to node: {}", cr, nodeId); + sends.add(cr.request()); + this.inFlightRequests.add(cr); + } + } + } + /** * Initiate a connection to the given node */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/AbortTransactionException.java b/clients/src/main/java/org/apache/kafka/clients/producer/AbortTransactionException.java new file mode 100644 index 0000000..2be5409 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/AbortTransactionException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import org.apache.kafka.common.KafkaException; + +/** + * This exception is thrown after an exception has been aborted. It is meant to notify the caller that the previous ongoing transaction is now aborted. + * It will usually contain a Throwable that represents the underlying exception that trigger the abort. + */ +public class AbortTransactionException extends KafkaException { + + private static final long serialVersionUID = 1L; + + public AbortTransactionException(String message) { + super(message); + } + + public AbortTransactionException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java b/clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java new file mode 100644 index 0000000..5b18906 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import org.apache.kafka.common.KafkaException; + +/** + * This exception is thrown if the producer tries to begin, abort or commit a transaction while in an invalid status, i.e + * tries to begin a transaction when one is already ongoing, or tries to abort or commit a transaction when there is no + * ongoing transaction. + */ +public class InvalidTransactionStatusException extends KafkaException { + + private static final long serialVersionUID = 1L; + + public InvalidTransactionStatusException(String message) { + super(message); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f58b850..25307fb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -14,6 +14,7 @@ package org.apache.kafka.clients.producer; import java.net.InetSocketAddress; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -22,10 +23,13 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.consumer.OffsetMetadata; import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.clients.producer.internals.Partitioner; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; +import org.apache.kafka.clients.producer.internals.TransactionContext; +import org.apache.kafka.clients.producer.internals.TransactionControl; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -44,6 +48,8 @@ import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.requests.TransactionRequest; +import org.apache.kafka.common.requests.TransactionResponse; import org.apache.kafka.common.utils.ClientUtils; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.SystemTime; @@ -66,6 +72,8 @@ public class KafkaProducer implements Producer { private final Partitioner partitioner; private final int maxRequestSize; private final long metadataFetchTimeoutMs; + private final int txRequestTimeoutMs; + private final int ackTimeoutMs; private final long totalMemorySize; private final Metadata metadata; private final RecordAccumulator accumulator; @@ -74,6 +82,8 @@ public class KafkaProducer implements Producer { private final Thread ioThread; private final CompressionType compressionType; private final Sensor errors; + private final TransactionContext txContext; + private final String clientTxGroup; private final Time time; /** @@ -109,6 +119,7 @@ public class KafkaProducer implements Producer { this.partitioner = new Partitioner(); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); + this.txRequestTimeoutMs = config.getInt(ProducerConfig.TX_REQUEST_TIMEOUT_CONFIG); this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); @@ -121,15 +132,24 @@ public class KafkaProducer implements Producer { metrics, time); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + this.clientTxGroup = config.getString(ProducerConfig.CLIENT_TX_GROUP_CONFIG); + this.ackTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); + if (! clientTxGroup.equals("")) { // is transactional ? + this.txContext = new TransactionContext(clientTxGroup); + this.txContext.doUpdateTxCoordinator(); // to set up the transaction coordinator + } else{ + this.txContext = null; + } NetworkClient client = new NetworkClient(new Selector(this.metrics, time), - this.metadata, - clientId, - config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), - config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), - config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), - config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG)); + this.metadata, + clientId, + config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), + config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), + config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), + config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), + this.txContext); this.sender = new Sender(client, this.metadata, this.accumulator, @@ -157,6 +177,120 @@ public class KafkaProducer implements Producer { } /** + * {@inheritDoc} + * Tries to begin a transaction by sending a BEGIN message to the current transaction coordinator. + * Receives the trasaction ID with the response and sets up a new transaction context + * @see org.apache.kafka.clients.producer.Producer#begin() + * @param txExpirationTime The maximum time a transaction can be alive, after which it will be expired + */ + public boolean begin(int txExpirationTime){ + if(!txContext.canBeginTransaction()){ + return false; + } + short txControl = TransactionControl.codeFor(TransactionControl.BEGIN); + TransactionRequest beginRequest = new TransactionRequest(txContext.getTxGroupId(), -1, txControl, txExpirationTime, ackTimeoutMs, + new HashSet()); + log.trace("Send tx request: {}, BEGIN", beginRequest); + TransactionResponse beginResponse = sendTransactionRequest(beginRequest, txRequestTimeoutMs); + log.trace("Received tx response: {}, BEGIN", beginResponse); + beginResponse.getError().maybeThrow(); + int txId = beginResponse.getTxId(); + txContext.beginTransaction(txId); + return true; + } + + /** + * {@inheritDoc} + * Flushes all pending messages first, sends the ABORT message to the transaction coordinator and + * cleans the current transaction context. + * @see org.apache.kafka.clients.producer.Producer#abort() + */ + public void abort(){ + if(accumulator.hasUnsent()){ + accumulator.flush(); + sender.wakeup(); + } + txContext.waitForAck(txRequestTimeoutMs); + accumulator.resetFlush(); + short txControl = TransactionControl.codeFor(TransactionControl.ABORT); + TransactionRequest abortRequest = new TransactionRequest(clientTxGroup, txContext.getTxId(), txControl, + txRequestTimeoutMs, ackTimeoutMs, txContext.getParticipatingPartitions()); + log.trace("Send tx request: {}, ABORT", abortRequest); + TransactionResponse abortResponse = sendTransactionRequest(abortRequest, txRequestTimeoutMs); + log.trace("Received tx response: {}, ABORT", abortResponse); + abortResponse.getError().maybeThrow(); + txContext.abortTransaction(); + } + + /** + * {@inheritDoc} + * Flushes pending messages first and blocks until it has successfully receives answers for all of them. + * It then sends a COMMIT message to the transaction coordinator and cleans the transaction context. + * @see org.apache.kafka.clients.producer.Producer#commit() + */ + public boolean commit(Map offsets){ + if (!txContext.canCommitTransaction()) { + return false; + } + if(offsets != null){ + // commit offsets, not in this patch + } + if(accumulator.hasUnsent()){ + accumulator.flush(); + sender.wakeup(); + } + txContext.waitForAck(txRequestTimeoutMs); + accumulator.resetFlush(); + short txControl = TransactionControl.codeFor(TransactionControl.COMMIT); + TransactionRequest commitRequest = new TransactionRequest(clientTxGroup, txContext.getTxId(), txControl, + txRequestTimeoutMs, ackTimeoutMs, txContext.getParticipatingPartitions()); + log.trace("Send tx request: {}, COMMIT", commitRequest); + TransactionResponse commitResponse = sendTransactionRequest(commitRequest, txRequestTimeoutMs); + log.trace("Received tx response: {}, COMMIT", commitResponse); + commitResponse.getError().maybeThrow(); + txContext.commitTransaction(); + return true; + } + + /** + * Send tx request and blocks for a max time to get the transaction response + * @param tr + * @param metadataFetchTimeoutMs + * @return + */ + private TransactionResponse sendTransactionRequest(TransactionRequest tr, int maxWaitMs) { + // Block until tx coordinator info is available + try { + this.txContext.ensureTxCoordinatorInfoIsAvailable(maxWaitMs); + } catch (TimeoutException te) { + log.info("Timeout to get TxCoordinadorInfo. Aborting TX id: {} ", txContext.getTxId()); + txContext.maybeAbortTransaction(); + throw new AbortTransactionException("Transaction was aborted due to underlying exception", te); + } + this.txContext.txRequestAvailable(tr); + long begin = System.currentTimeMillis(); + long remainingWaitMs = maxWaitMs; + do { + TransactionResponse response = txContext.getTxResponse(); + sender.wakeup(); + if (response == null) { + txContext.waitForTxResponse(remainingWaitMs); + sender.wakeup(); + long elapsed = System.currentTimeMillis() - begin; + if (elapsed >= maxWaitMs) { + log.info("No TxResponse on time. Aborting TX id: {} ", txContext.getTxId()); + txContext.maybeAbortTransaction(); + TimeoutException te = new TimeoutException("Failed to get tx control response after " + maxWaitMs + " ms."); + throw new AbortTransactionException("Transaction was aborted due to underlying exception", te); + } + remainingWaitMs = maxWaitMs - elapsed; + } else { + return response; + } + } while (true); + } + + /** * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)} */ @Override @@ -180,14 +314,14 @@ public class KafkaProducer implements Producer { * sending the record. *

* If you want to simulate a simple blocking call you can do the following: - * + * *

      *   producer.send(new ProducerRecord("the-topic", "key, "value")).get();
      * 
*

* Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that * will be invoked when the request is complete. - * + * *

      *   ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
      *   producer.send(myRecord,
@@ -199,10 +333,10 @@ public class KafkaProducer implements Producer {
      *                     }
      *                 });
      * 
- * + * * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the * following example callback1 is guaranteed to execute before callback2: - * + * *
      * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
      * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
@@ -219,7 +353,7 @@ public class KafkaProducer implements Producer {
      * this case is to block the send call until the I/O thread catches up and more buffer space is available. However
      * in cases where non-blocking usage is desired the setting block.on.buffer.full=false will cause the
      * producer to instead throw an exception when buffer memory is exhausted.
-     * 
+     *
      * @param record The record to send
      * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
      *        indicates no callback)
@@ -234,7 +368,28 @@ public class KafkaProducer implements Producer {
             ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);
             log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
-            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, record.key(), record.value(), compressionType, callback);
+            RecordAccumulator.RecordAppendResult result = null;
+            // If transaction is in flight, then append txId to messages and track responses.
+            if(txContext != null){
+              if (txContext.isTherePendingTransaction()){
+                txContext.addTopicPartitionToParticipatingTransactions(tp);
+                log.trace("Sending TX record {}, from tx {} to topic {} partition {}", record, txContext.getTxId(), record.topic(), partition);
+                result = accumulator.append(tp, record.key(), record.value(), compressionType, callback, txContext.getTxId());
+                txContext.addPendingMessage(result.future);
+              }
+              else if (txContext.isTxAborted()){
+                  // write a future with an error as the tx is aborted and has not been handled
+                  InvalidTransactionStatusException itse = new InvalidTransactionStatusException("Attempt to send tx data while in ABORTED status. Clean the aborted tx first");
+                  log.debug("Exception occurred during message send:", itse);
+                  if (callback != null)
+                      callback.onCompletion(null, itse);
+                  this.errors.record();
+                  return new FutureFailure(itse);
+              }
+            } else {
+              log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
+              result = accumulator.append(tp, record.key(), record.value(), compressionType, callback, -1);
+            }
             if (result.batchIsFull || result.newBatchCreated) {
                 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                 this.sender.wakeup();
@@ -244,15 +399,20 @@ public class KafkaProducer implements Producer {
             // For API exceptions return them in the future,
             // for other exceptions throw directly
         } catch (ApiException e) {
+            if (txContext != null) {
+                txContext.maybeAbortTransaction();
+            }
             log.debug("Exception occurred during message send:", e);
             if (callback != null)
                 callback.onCompletion(null, e);
             this.errors.record();
             return new FutureFailure(e);
         } catch (InterruptedException e) {
+            txContext.maybeAbortTransaction();
             this.errors.record();
             throw new KafkaException(e);
         } catch (KafkaException e) {
+            txContext.maybeAbortTransaction();
             this.errors.record();
             throw e;
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index c0f1d57..3d70951 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
+import org.apache.kafka.clients.consumer.OffsetMetadata;
 import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
 import org.apache.kafka.clients.producer.internals.Partitioner;
 import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
@@ -51,7 +52,7 @@ public class MockProducer implements Producer {
 
     /**
      * Create a mock producer
-     * 
+     *
      * @param cluster The cluster holding metadata for this producer
      * @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise
      *        the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after
@@ -68,7 +69,7 @@ public class MockProducer implements Producer {
 
     /**
      * Create a new mock producer with invented metadata the given autoComplete setting.
-     * 
+     *
      * Equivalent to {@link #MockProducer(Cluster, boolean) new MockProducer(null, autoComplete)}
      */
     public MockProducer(boolean autoComplete) {
@@ -77,16 +78,31 @@ public class MockProducer implements Producer {
 
     /**
      * Create a new auto completing mock producer
-     * 
+     *
      * Equivalent to {@link #MockProducer(boolean) new MockProducer(true)}
      */
     public MockProducer() {
         this(true);
     }
 
+    @Override
+    public boolean begin(int txExpirationTime){
+        return true;
+    }
+
+    @Override
+    public void abort(){
+
+    }
+
+    @Override
+    public boolean commit(Map offsets){
+        return true;
+    }
+
     /**
      * Adds the record to the list of sent records. The {@link RecordMetadata} returned will be immediately satisfied.
-     * 
+     *
      * @see #history()
      */
     @Override
@@ -96,7 +112,7 @@ public class MockProducer implements Producer {
 
     /**
      * Adds the record to the list of sent records.
-     * 
+     *
      * @see #history()
      */
     @Override
@@ -161,7 +177,7 @@ public class MockProducer implements Producer {
 
     /**
      * Complete the earliest uncompleted call successfully.
-     * 
+     *
      * @return true if there was an uncompleted call to complete
      */
     public synchronized boolean completeNext() {
@@ -170,7 +186,7 @@ public class MockProducer implements Producer {
 
     /**
      * Complete the earliest uncompleted call with the given error.
-     * 
+     *
      * @return true if there was an uncompleted call to complete
      */
     public synchronized boolean errorNext(RuntimeException e) {
@@ -212,5 +228,4 @@ public class MockProducer implements Producer {
             }
         }
     }
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 36e8398..330c19c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -21,21 +21,41 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
+import org.apache.kafka.clients.consumer.OffsetMetadata;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 
 
 /**
  * The interface for the {@link KafkaProducer}
- * 
+ *
  * @see KafkaProducer
  * @see MockProducer
  */
 public interface Producer extends Closeable {
 
     /**
+     * Begin a transaction and specify a timeout.
+     * @param txExpirationTime
+     */
+    public boolean begin(int txExpirationTime);
+
+    /**
+     * Aborts the current transaction
+     */
+
+    public void abort();
+
+    /**
+     * Commits the current transaction
+     */
+
+    public boolean commit(Map offsets);
+
+    /**
      * Send the given record asynchronously and return a future which will eventually contain the response information.
-     * 
+     *
      * @param record The record to send
      * @return A future which will eventually contain the response information
      */
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f9de4af..f825c40 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -49,6 +49,10 @@ public class ProducerConfig extends AbstractConfig {
     private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the " + "topic's partitions. This configuration controls the maximum amount of time we will block waiting for the metadata "
                                                              + "fetch to succeed before throwing an exception back to the client.";
 
+    /** tx.fetch.timeout.ms **/
+    public static final String TX_REQUEST_TIMEOUT_CONFIG = "tx.request.timeout.ms";
+    private static final String TX_REQUEST_TIMEOUT_DOC = "The timeout for sending transaction requests (begin, abort, commit)";
+
     /** metadata.max.age.ms */
     public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
     private static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any " + " partition leadership changes to proactively discover any new brokers or partitions.";
@@ -115,6 +119,11 @@ public class ProducerConfig extends AbstractConfig {
     public static final String CLIENT_ID_CONFIG = "client.id";
     private static final String CLIENT_ID_DOC = "The id string to pass to the server when making requests. The purpose of this is to be able to track the source " + "of requests beyond just ip/port by allowing a logical application name to be included with the request. The "
                                                 + "application can set any string it wants as this has no functional purpose other than in logging and metrics.";
+    /** client.txgroup.id */
+    public static final String CLIENT_TX_GROUP_CONFIG = "client.txgroup.id";
+    private static final String CLIENT_TX_GROUP_DOC = "The transaction group of the producer. A transaction group is a collection of producers "
+                                                + "who communicate with the same transaction manager when creating transactions. To provide committment ordering, a producers has to send all"
+                                                + "its transactions to the same transaction manager, otherwise no ordering is guaranteed.";
 
     /** send.buffer.bytes */
     public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
@@ -181,6 +190,7 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)
                                 .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
                                 .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC)
+                                .define(CLIENT_TX_GROUP_CONFIG, Type.STRING, "", Importance.LOW, CLIENT_TX_GROUP_DOC)
                                 .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, SEND_BUFFER_DOC)
                                 .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, RECEIVE_BUFFER_DOC)
                                 .define(MAX_REQUEST_SIZE_CONFIG,
@@ -199,6 +209,7 @@ public class ProducerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.LOW,
                                         METADATA_FETCH_TIMEOUT_DOC)
+                                .define(TX_REQUEST_TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.LOW, TX_REQUEST_TIMEOUT_DOC)
                                 .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
                                 .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
                                         Type.LONG,
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index c5d4700..19f1878 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -54,6 +54,8 @@ public final class RecordAccumulator {
     private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class);
 
     private volatile boolean closed;
+    /* Indicates if it's been requested to flush the complete RecordAccumulator */
+    private volatile boolean flushRequested;
     private int drainIndex;
     private final int batchSize;
     private final long lingerMs;
@@ -64,7 +66,7 @@ public final class RecordAccumulator {
 
     /**
      * Create a new record accumulator
-     * 
+     *
      * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances
      * @param totalSize The maximum memory the record accumulator can use.
      * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
@@ -86,6 +88,7 @@ public final class RecordAccumulator {
                              Time time) {
         this.drainIndex = 0;
         this.closed = false;
+        this.flushRequested = false;
         this.batchSize = batchSize;
         this.lingerMs = lingerMs;
         this.retryBackoffMs = retryBackoffMs;
@@ -131,7 +134,8 @@ public final class RecordAccumulator {
      * @param compression The compression codec for the record
      * @param callback The user-supplied callback to execute when the request is complete
      */
-    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
+    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, 
+                                       Callback callback, int txId) throws InterruptedException {
         if (closed)
             throw new IllegalStateException("Cannot send after the producer is closed.");
         // check if we have an in-progress batch
@@ -139,7 +143,7 @@ public final class RecordAccumulator {
         synchronized (dq) {
             RecordBatch last = dq.peekLast();
             if (last != null) {
-                FutureRecordMetadata future = last.tryAppend(key, value, callback);
+                FutureRecordMetadata future = last.tryAppend(key, value, txId, callback);
                 if (future != null) {
                     return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                 }
@@ -153,7 +157,7 @@ public final class RecordAccumulator {
         synchronized (dq) {
             RecordBatch last = dq.peekLast();
             if (last != null) {
-                FutureRecordMetadata future = last.tryAppend(key, value, callback);
+                FutureRecordMetadata future = last.tryAppend(key, value, txId, callback);
                 if (future != null) {
                     // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen
                     // often...
@@ -163,7 +167,7 @@ public final class RecordAccumulator {
             }
             MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
             RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
-            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
+            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, txId, callback));
 
             dq.addLast(batch);
             return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
@@ -195,6 +199,7 @@ public final class RecordAccumulator {
      * 
  • The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are * immediately considered ready). *
  • The accumulator has been closed + *
  • There is a flushRequested. For example, a transaction has produced all its data. * */ public ReadyCheckResult ready(Cluster cluster, long nowMs) { @@ -220,7 +225,7 @@ public final class RecordAccumulator { long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); boolean expired = waitedTimeMs >= lingerMs; - boolean sendable = full || expired || exhausted || closed; + boolean sendable = full || expired || exhausted || closed || flushRequested; if (sendable && !backingOff) readyNodes.add(leader); nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); @@ -249,7 +254,7 @@ public final class RecordAccumulator { /** * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over. - * + * * @param cluster The current cluster metadata * @param nodes The list of node to drain * @param maxSize The maximum number of bytes to drain @@ -317,6 +322,20 @@ public final class RecordAccumulator { } /** + * Sets the flush flag to true + */ + public void flush(){ + this.flushRequested = true; + } + + /** + * Unsets the flush flag + */ + public void resetFlush(){ + this.flushRequested = false; + } + + /** * Close this accumulator and force all the record buffers to be drained */ public void close() { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index dd0af8a..f028a22 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory; /** * A batch of records that is or will be sent. - * + * * This class is not thread safe and external synchronization must be used when modifying it */ public final class RecordBatch { @@ -53,14 +53,14 @@ public final class RecordBatch { /** * Append the record to the current record set and return the relative offset within that record set - * + * * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ - public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) { + public FutureRecordMetadata tryAppend(byte[] key, byte[] value, int txId, Callback callback) { if (!this.records.hasRoomFor(key, value)) { return null; } else { - this.records.append(0L, key, value); + this.records.append(0L, key, value, txId); this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) @@ -72,7 +72,7 @@ public final class RecordBatch { /** * Complete the request - * + * * @param baseOffset The base offset of the messages assigned by the server * @param exception The exception that occurred (or null if the request was successful) */ @@ -113,4 +113,4 @@ public final class RecordBatch { public String toString() { return "RecordBatch(topicPartition=" + topicPartition + ", recordCount=" + recordCount + ")"; } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 8ebe7ed..01e7953 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -178,6 +178,7 @@ public class Sender implements Runnable { handleDisconnect(response, now); else handleResponse(response, now); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java new file mode 100644 index 0000000..ad91085 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java @@ -0,0 +1,364 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.producer.InvalidTransactionStatusException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TransactionFailedException; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.TransactionRequest; +import org.apache.kafka.common.requests.TransactionResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class encapsulates state and logic for transactional producers. + * This class is shared by the client thread and the background sender thread. + */ +public class TransactionContext { + + private static final Logger log = LoggerFactory.getLogger(TransactionContext.class); + + private enum TransactionStatus{ + PENDING, NOTRANSACTION, ABORTED + } + + /* The status of the current transaction */ + private TransactionStatus txStatus; + /* Unique transaction id */ + private int txId; + /* These are the partitions involved in the current transaction */ + private Set participatingPartitions; + /* All Futures to messages sent within the current transaction */ + private Set pendingMessages; + /* Indicates whether is necessary to update tx metadata */ + private AtomicBoolean updateTxCoordinatorMetadata; + /* Stores the offsets required to redo this tx, i.e. the offsets of the first message within the tx */ + private AtomicReference> txOffsets; + /* String identifying the tx producer group */ + private final String txGroupId; + /* The node with the information for the transaction coordinator */ + private Node txCoordinatorNode; + /* Keeps a transaction request until it is written */ + private AtomicReference pendingTransactionRequest; + /* Keeps the transaction response until it is read */ + private AtomicReference transactionResponse; + + public TransactionContext(String txGroupId){ + this.txStatus = TransactionStatus.NOTRANSACTION; + this.txId = -1; + this.participatingPartitions = new HashSet(); + this.pendingMessages = new HashSet(); + this.txGroupId = txGroupId; + this.pendingTransactionRequest = new AtomicReference(); + this.transactionResponse = new AtomicReference(); + this.txOffsets = new AtomicReference>(); + this.updateTxCoordinatorMetadata = new AtomicBoolean(false); + } + + public void setTxId(int txId){ + this.txId = txId; + } + + public int getTxId(){ + return txId; + } + + public void setTxOffsets(Map txOffsets){ + this.txOffsets = new AtomicReference>(txOffsets); + } + + /** + * Indicates it is required to update transaction metadata (coordinator node) + */ + public void doUpdateTxCoordinator(){ + this.updateTxCoordinatorMetadata.set(true); + } + + /** + * Indicates if it is necessary to update the transaction metadata (coordinator node) + * @return + */ + public boolean needToUpdateTxCoordinator(){ + return this.updateTxCoordinatorMetadata.getAndSet(false); + } + + /** + * Return the offsets required to restart the transaction, i.e. those of the first tx message + * @return + */ + public Map getTxOffsets(){ + return this.txOffsets.getAndSet(null); + } + + public Map getTxOffsetsAvailable(){ + return this.txOffsets.get(); + } + + /** + * Blocks until a valid transaction coordinator is available or the timeout expires + * @param maxWaitMs + */ + public void ensureTxCoordinatorInfoIsAvailable(long maxWaitMs){ + long begin = System.currentTimeMillis(); + long remainingWaitMs = maxWaitMs; + do{ + try { + if (isTxCoordinatorAvailable()) + return; + log.trace("Tx Coordinator Metadata not available. Fetching..."); + synchronized (this) { + wait(remainingWaitMs); + } + } catch (InterruptedException e) { /* this is fine, just try again */ + } + long elapsed = System.currentTimeMillis() - begin; + if (elapsed >= maxWaitMs) + throw new TimeoutException("Failed to receive all tx ACKS before " + maxWaitMs + " ms."); + remainingWaitMs = maxWaitMs - elapsed; + } + while(true); + } + + /** + * Add a new future to the pending messages + * @param future The new pending message + */ + public void addPendingMessage(FutureRecordMetadata future){ + pendingMessages.add(future); + } + + public boolean isTxAborted() { + return this.txStatus.equals(TransactionStatus.ABORTED); + } + + /** + * Sets up transaction context to track a new transaction + * @param txId The id of the current transaction + * @throws InvalidTransactionStatusException + */ + public void beginTransaction(int txId) throws InvalidTransactionStatusException { + if(txStatus == TransactionStatus.PENDING) + throw new InvalidTransactionStatusException("Attempt to start a transaction before finishing a previous one"); + log.trace("Starting transaction {} ", txId); + this.txId = txId; + txStatus = TransactionStatus.PENDING; + } + + /** + * Sets up transaction context to abort the current transaction + * @throws InvalidTransactionStatusException + */ + public void abortTransaction() throws InvalidTransactionStatusException { + if(txStatus == TransactionStatus.NOTRANSACTION) + throw new InvalidTransactionStatusException("Attempt to abort a non-existent transaction"); + log.trace("Aborting transaction {} ", txId); + txStatus = TransactionStatus.ABORTED; + this.clearTransactionContext(); + } + + /** + * Sets up transaction context to commit the current transaction + * @throws InvalidTransactionStatusException + */ + public void commitTransaction() throws InvalidTransactionStatusException { + if(txStatus == TransactionStatus.ABORTED | txStatus == TransactionStatus.NOTRANSACTION) + throw new InvalidTransactionStatusException("Attempt to commit an aborted or non-existent transaction"); + log.trace("Committing transaction {} ", txId); + txStatus = TransactionStatus.NOTRANSACTION; + this.clearTransactionContext(); + } + + /** + * Checks whether transaction can begin + */ + public boolean canBeginTransaction(){ + if(! (txStatus == TransactionStatus.NOTRANSACTION)){ + return false; + } + return true; + } + + /** + * Aborts a transaction if it is in a valid state + */ + public void maybeAbortTransaction() { + if(! (txStatus == TransactionStatus.PENDING || txStatus == TransactionStatus.ABORTED)){ + abortTransaction(); + } + } + + /** + * Checks whether transaction can commit + */ + public boolean canCommitTransaction(){ + if(! (txStatus == TransactionStatus.PENDING)){ + log.debug("Cannot commit tx: "); + return false; + } + if(participatingPartitions.isEmpty()) { + return false; + } + return true; + } + + /** + * Waits until all messages of current transaction are ACK. Fails if a message fails or time outs. + * @param maxWaitMs ACK timeout + * @throws TimeoutException + * @throws TransactionFailedException + */ + public void waitForAck(long maxWaitMs) throws TimeoutException { + long begin = System.currentTimeMillis(); + long remainingWaitMs = maxWaitMs; + Iterator iMsg = pendingMessages.iterator(); + while(iMsg.hasNext()) { + FutureRecordMetadata msg = iMsg.next(); + try { + msg.get(remainingWaitMs, TimeUnit.MILLISECONDS); + long elapsed = System.currentTimeMillis() - begin; + remainingWaitMs = maxWaitMs - elapsed; + } catch (InterruptedException e) { + throw new TransactionFailedException("Some messages of the transaction failed"); + } catch (ExecutionException e) { + throw new TransactionFailedException("Some messages of the transaction failed"); + } catch (java.util.concurrent.TimeoutException e) { + throw new TimeoutException("Failed to receive all tx ACKS before " + maxWaitMs + " ms."); + } + //iMsg.remove(); + } + pendingMessages.clear(); + } + + /** + * Checks if there is a pending transaction + * @return true if there is a pending transaction, false otherwise + */ + public boolean isTherePendingTransaction(){ + return txStatus == TransactionStatus.PENDING; + } + + public Set getParticipatingPartitions(){ + return this.participatingPartitions; + } + + public void addTopicPartitionToParticipatingTransactions(TopicPartition tp) { + this.participatingPartitions.add(tp); + } + + /** + * Resets the current transaction context so that a new transaction can start + */ + private void clearTransactionContext() { + this.txId = -1; + this.participatingPartitions.clear(); + } + + /** + * Blocks until explicitly modified after the receipt of a tx response + * @param waitTimeMs + */ + public void waitForTxResponse(long waitTimeMs) { + synchronized(this) { + try { + wait(waitTimeMs); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + } + } + } + + /** + * Get the producerGroupId configured on startup + */ + public String getTxGroupId() { + return this.txGroupId; + } + + /** + * Sets the transaction coordinator node info + */ + public void setTxCoordinatorNode(Node txCoordinatorNode) { + log.debug("Update transaction coordinator node: {} ", txCoordinatorNode); + this.txCoordinatorNode = txCoordinatorNode; + synchronized (this) { + this.notify(); // unblock ensureTxCoordinatorInfoIsAvailable + } + } + + /** + * Get the transaction coordinator node + */ + public Node getTxCoordinatorNode(){ + return this.txCoordinatorNode; + } + + /** + * Sets a new transaction request that is to be sent + */ + public void txRequestAvailable(TransactionRequest tr){ + this.pendingTransactionRequest.set(tr); + } + + /** + * Makes available the transaction response and notifies its reception + * @param tr + */ + public void txResponseAvailable(TransactionResponse tr){ + log.trace("Setting tx response: {}", tr); + this.transactionResponse.set(tr); + synchronized(this) { + notify(); + } + log.trace("Tx response set and notified"); + } + + /** + * Checks if transaction coordinator is available + */ + private boolean isTxCoordinatorAvailable(){ + return this.txCoordinatorNode != null; + } + + public TransactionResponse getTxResponse(){ + return this.transactionResponse.getAndSet(null); + } + + public boolean isTxRequestAvailable(){ + return this.getAndClearTxRequest() != null; + } + + public TransactionRequest getTxRequest(){ + return this.getAndClearTxRequest(); + } + + private TransactionRequest getAndClearTxRequest(){ + return this.pendingTransactionRequest.getAndSet(null); + } + + @Override + public String toString(){ + return "Tx-Id: " +txId+ " Status: " +txStatus+ "#InvolvedPartitions: " +participatingPartitions.size()+ ""; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java new file mode 100644 index 0000000..05f3a9d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +public enum TransactionControl { + BEGIN(1), + COMMIT(2), + ABORT(5); + + private short txControlCode; + + private TransactionControl(int txControlCode){ + this.txControlCode = (short) txControlCode; + } + + public static short codeFor(TransactionControl txControl){ + return txControl.txControlCode; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java new file mode 100644 index 0000000..81b24aa --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * There is no valid transaction coordinator available, and so transactions do not work + */ +public class TransactionCoordinatorNotAvailableException extends RetriableException { + + private static final long serialVersionUID = 1L; + + public TransactionCoordinatorNotAvailableException() { + } + + public TransactionCoordinatorNotAvailableException(String message) { + super(message); + } + + public TransactionCoordinatorNotAvailableException(Throwable throwable) { + super(throwable); + } + + public TransactionCoordinatorNotAvailableException(String message, Throwable throwable) { + super(message, throwable); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java b/clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java new file mode 100644 index 0000000..d2b9440 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * The transaction failed, and it should be aborted + */ +public class TransactionFailedException extends RetriableException { + + private static final long serialVersionUID = 1L; + + public TransactionFailedException() { + } + + public TransactionFailedException(String message) { + super(message); + } + + public TransactionFailedException(Throwable throwable) { + super(throwable); + } + + public TransactionFailedException(String message, Throwable throwable) { + super(message, throwable); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index 0323f5f..2798ca1 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -80,7 +80,7 @@ public class Compressor { public ByteBuffer buffer() { return bufferStream.buffer(); } - + public double compressionRate() { ByteBuffer buffer = bufferStream.buffer(); if (this.writtenUncompressed == 0) @@ -103,7 +103,7 @@ public class Compressor { buffer.position(initPos); buffer.putLong(numRecords - 1); buffer.putInt(pos - initPos - Records.LOG_OVERHEAD); - // write the shallow message (the crc and value size are not correct yet) + // write the shallow message (the crc, and value size are not correct yet) Record.write(buffer, null, null, type, 0, -1); // compute the fill the value size int valueSize = pos - initPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD; @@ -167,19 +167,20 @@ public class Compressor { } } - public void putRecord(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { + public void putRecord(byte[] key, byte[] value, int txId, CompressionType type, int valueOffset, int valueSize) { // put a record as un-compressed into the underlying stream - long crc = Record.computeChecksum(key, value, type, valueOffset, valueSize); + long crc = Record.computeChecksum(key, value, txId, type, valueOffset, valueSize); byte attributes = Record.computeAttributes(type); - putRecord(crc, attributes, key, value, valueOffset, valueSize); + putRecord(crc, attributes, key, value, txId, valueOffset, valueSize); } - public void putRecord(byte[] key, byte[] value) { - putRecord(key, value, CompressionType.NONE, 0, -1); + public void putRecord(byte[] key, byte[] value, int txId) { + putRecord(key, value, txId, CompressionType.NONE, 0, -1); } - private void putRecord(final long crc, final byte attributes, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) { - Record.write(this, crc, attributes, key, value, valueOffset, valueSize); + private void putRecord(final long crc, final byte attributes, final byte[] key, final byte[] value, final int txId, + final int valueOffset, final int valueSize) { + Record.write(this, crc, attributes, key, value, txId, valueOffset, valueSize); } public void recordWritten(int size) { diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 040e5b9..a64c16f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -77,20 +77,20 @@ public class MemoryRecords implements Records { /** * Append a new record and offset to the buffer */ - public void append(long offset, byte[] key, byte[] value) { + public void append(long offset, byte[] key, byte[] value, int txId) { if (!writable) throw new IllegalStateException("Memory records is not writable"); int size = Record.recordSize(key, value); compressor.putLong(offset); compressor.putInt(size); - compressor.putRecord(key, value); + compressor.putRecord(key, value, txId); compressor.recordWritten(size + Records.LOG_OVERHEAD); } /** * Check if we have room for a new record containing the given key/value pair - * + * * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be * accurate if compression is really used. When this happens, the following append may cause dynamic buffer * re-allocation in the underlying byte buffer stream. @@ -132,7 +132,7 @@ public class MemoryRecords implements Records { public int sizeInBytes() { return compressor.buffer().position(); } - + /** * The compression rate of this record set */ @@ -179,7 +179,7 @@ public class MemoryRecords implements Records { /* * Read the next record from the buffer. - * + * * Note that in the compressed message set, each message value size is set as the size of the un-compressed * version of the message value, so when we do de-compression allocating an array of the specified size for * reading compressed value data is sufficient. diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index 10df9fd..f31b70e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -34,9 +34,11 @@ public final class Record { public static final int CRC_LENGTH = 4; public static final int MAGIC_OFFSET = CRC_OFFSET + CRC_LENGTH; public static final int MAGIC_LENGTH = 1; + public static final int TXID_LENGTH = 4; public static final int ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH; public static final int ATTRIBUTE_LENGTH = 1; - public static final int KEY_SIZE_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; + public static final int TXID_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; + public static final int KEY_SIZE_OFFSET = TXID_OFFSET + TXID_LENGTH; public static final int KEY_SIZE_LENGTH = 4; public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH; public static final int VALUE_SIZE_LENGTH = 4; @@ -44,7 +46,7 @@ public final class Record { /** * The size for the record header */ - public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH; + public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + TXID_LENGTH; /** * The amount of overhead bytes in a record @@ -77,7 +79,7 @@ public final class Record { * A constructor to create a LogRecord. If the record's compression type is not none, then * its value payload should be already compressed with the specified type; the constructor * would always write the value payload as is and will not do the compression itself. - * + * * @param key The key of the record (null, if none) * @param value The record value * @param type The compression type used on the contents of the record (if any) @@ -113,16 +115,18 @@ public final class Record { // construct the compressor with compression type none since this function will not do any //compression according to the input type, it will just write the record's payload as is Compressor compressor = new Compressor(buffer, CompressionType.NONE, buffer.capacity()); - compressor.putRecord(key, value, type, valueOffset, valueSize); + compressor.putRecord(key, value, -1, type, valueOffset, valueSize); } - public static void write(Compressor compressor, long crc, byte attributes, byte[] key, byte[] value, int valueOffset, int valueSize) { + public static void write(Compressor compressor, long crc, byte attributes, byte[] key, byte[] value, int txId, int valueOffset, int valueSize) { // write crc compressor.putInt((int) (crc & 0xffffffffL)); // write magic value compressor.putByte(CURRENT_MAGIC_VALUE); // write attributes compressor.putByte(attributes); + // write the txId + compressor.putInt(txId); // write the key if (key == null) { compressor.putInt(-1); @@ -145,7 +149,7 @@ public final class Record { } public static int recordSize(int keySize, int valueSize) { - return CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize; + return CRC_LENGTH + MAGIC_LENGTH + TXID_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + keySize + VALUE_SIZE_LENGTH + valueSize; } public ByteBuffer buffer() { @@ -171,13 +175,15 @@ public final class Record { /** * Compute the checksum of the record from the attributes, key and value payloads */ - public static long computeChecksum(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { + public static long computeChecksum(byte[] key, byte[] value, int txId, CompressionType type, int valueOffset, int valueSize) { Crc32 crc = new Crc32(); crc.update(CURRENT_MAGIC_VALUE); byte attributes = 0; if (type.id > 0) attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id)); crc.update(attributes); + // update for the txId + crc.updateInt(txId); // update for the key if (key == null) { crc.updateInt(-1); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index 0762b35..23381ce 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -61,10 +61,10 @@ public class RecordAccumulatorTest { RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, CompressionType.NONE, null, 0); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, CompressionType.NONE, null, 0); assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); List batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id()); assertEquals(1, batches.size()); @@ -82,7 +82,7 @@ public class RecordAccumulatorTest { public void testAppendLarge() throws Exception { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time); - accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); + accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null, 0); assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); } @@ -90,7 +90,7 @@ public class RecordAccumulatorTest { public void testLinger() throws Exception { long lingerMs = 10L; RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time); - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, CompressionType.NONE, null, 0); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -111,7 +111,7 @@ public class RecordAccumulatorTest { List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) - accum.append(tp, key, value, CompressionType.NONE, null); + accum.append(tp, key, value, CompressionType.NONE, null, 0); } assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -131,7 +131,7 @@ public class RecordAccumulatorTest { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null); + accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null, 0); } catch (Exception e) { e.printStackTrace(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index ef2ca65..5e1caaf 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -69,7 +69,7 @@ public class SenderTest { @Test public void testSimple() throws Exception { int offset = 0; - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null, -1).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); @@ -95,7 +95,7 @@ public class SenderTest { new Metrics(), time); // do a successful retry - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null, -1).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals(1, client.inFlightRequestCount()); @@ -112,7 +112,7 @@ public class SenderTest { assertEquals(offset, future.get().offset()); // do an unsuccessful retry - future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null, -1).future; sender.run(time.milliseconds()); // send produce request for (int i = 0; i < maxRetries + 1; i++) { client.disconnect(client.requests().peek().request().destination()); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/TransactionContextTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/TransactionContextTest.java new file mode 100644 index 0000000..49befc9 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/producer/TransactionContextTest.java @@ -0,0 +1,84 @@ +package org.apache.kafka.clients.producer; + +import static org.junit.Assert.assertTrue; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; +import org.apache.kafka.clients.producer.internals.ProduceRequestResult; +import org.apache.kafka.clients.producer.internals.TransactionContext; +import org.apache.kafka.common.TopicPartition; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class TransactionContextTest { + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void testTransactionStatus(){ + // Initial status, only begin is legal + TransactionContext txContext = new TransactionContext("0"); + txContext.canBeginTransaction(); + exception.expect(InvalidTransactionStatusException.class); + exception.expect(InvalidTransactionStatusException.class); + txContext.canCommitTransaction(); + + // start transaction, only abort or commit is legal + txContext.beginTransaction(0); + txContext.canCommitTransaction(); + exception.expect(InvalidTransactionStatusException.class); + txContext.canBeginTransaction(); + + // abort transaction, only begin is legal. Repeated aborts are ignored + txContext.abortTransaction(); + exception.expect(InvalidTransactionStatusException.class); + txContext.commitTransaction(); + txContext.abortTransaction(); + txContext.abortTransaction(); + txContext.abortTransaction(); + txContext.beginTransaction(0); + + // commit transaction, only begin is legal. + TransactionContext txContext2 = new TransactionContext("0"); + txContext2.canBeginTransaction(); + exception.expect(InvalidTransactionStatusException.class); + txContext2.abortTransaction(); + txContext2.canBeginTransaction(); + txContext2.commitTransaction(); + exception.expect(InvalidTransactionStatusException.class); + txContext2.canCommitTransaction(); + exception.expect(InvalidTransactionStatusException.class); + txContext2.commitTransaction(); + exception.expect(InvalidTransactionStatusException.class); + txContext2.abortTransaction(); + txContext2.beginTransaction(0); + + } + + @Test + public void testClearTransactionContext(){ + TransactionContext txContext = new TransactionContext("0"); + txContext.canBeginTransaction(); + txContext.beginTransaction(0); + TopicPartition tp1 = new TopicPartition("0", 0); + TopicPartition tp2 = new TopicPartition("1", 1); + txContext.addTopicPartitionToParticipatingTransactions(tp1); + txContext.addTopicPartitionToParticipatingTransactions(tp2); + txContext.addTopicPartitionToParticipatingTransactions(tp1); + txContext.addTopicPartitionToParticipatingTransactions(tp2); + + assertTrue(txContext.getTxId() >= 0 ); + assertTrue(txContext.getParticipatingPartitions().size() == 2); + assertTrue(txContext.isTherePendingTransaction()); + + txContext.commitTransaction(); + + assertTrue(txContext.getTxId() == -1); + assertTrue(txContext.getParticipatingPartitions().size() == 0); + assertTrue(!txContext.isTherePendingTransaction()); + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 94a1112..a4ce206 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -39,6 +39,7 @@ public class MemoryRecordsTest { @Test public void testIterator() { + int txId = -1; MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression); MemoryRecords recs2 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression); List list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()), @@ -47,7 +48,7 @@ public class MemoryRecordsTest { for (int i = 0; i < list.size(); i++) { Record r = list.get(i); recs1.append(i, r); - recs2.append(i, toArray(r.key()), toArray(r.value())); + recs2.append(i, toArray(r.key()), toArray(r.value()), txId); } recs1.close(); recs2.close(); diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java index 2765913..1569b53 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java @@ -66,7 +66,7 @@ public class RecordTest { assertEquals(record.checksum(), record.computeChecksum( this.key == null ? null : this.key.array(), this.value == null ? null : this.value.array(), - this.compression, 0, -1)); + -1, this.compression, 0, -1)); assertTrue(record.isValid()); for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) { Record copy = copyOf(record); -- 1.7.12.4