From 89ac014fdf9b75aa79bb14915039701ba0a41a12 Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Thu, 23 Jul 2015 21:23:08 -0700 Subject: [PATCH 1/7] Addressed Joel', Adi's, Dong's, Becket's comments --- .../org/apache/kafka/clients/ClientRequest.java | 8 +++ .../apache/kafka/clients/CommonClientConfigs.java | 6 ++ .../org/apache/kafka/clients/InFlightRequests.java | 32 ++++++++- .../java/org/apache/kafka/clients/KafkaClient.java | 2 +- .../org/apache/kafka/clients/NetworkClient.java | 27 ++++++-- .../kafka/clients/consumer/ConsumerConfig.java | 10 +++ .../kafka/clients/consumer/KafkaConsumer.java | 3 +- .../kafka/clients/producer/KafkaProducer.java | 65 +++++++++++++++--- .../kafka/clients/producer/ProducerConfig.java | 36 +++++++++- .../clients/producer/internals/BufferPool.java | 20 ++---- .../producer/internals/RecordAccumulator.java | 56 ++++++++++++---- .../clients/producer/internals/RecordBatch.java | 28 +++++++- .../kafka/clients/producer/internals/Sender.java | 21 ++++-- .../org/apache/kafka/common/network/Selector.java | 10 ++- .../java/org/apache/kafka/clients/MockClient.java | 2 +- .../apache/kafka/clients/NetworkClientTest.java | 31 +++++++-- .../clients/producer/internals/BufferPoolTest.java | 57 +++++++++------- .../producer/internals/RecordAccumulatorTest.java | 77 +++++++++++++++------- .../clients/producer/internals/SenderTest.java | 21 +++--- 19 files changed, 394 insertions(+), 118 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java index dc8f0f1..6410f09 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java @@ -24,6 +24,7 @@ public final class ClientRequest { private final RequestSend request; private final RequestCompletionHandler callback; private final boolean isInitiatedByNetworkClient; + private long sendMs; /** * @param createdMs The unix timestamp in milliseconds for the time at which this request was created. @@ -86,4 +87,11 @@ public final class ClientRequest { return isInitiatedByNetworkClient; } + public long getSendMs() { + return sendMs; + } + + public void setSendMs(long sendMs) { + this.sendMs = sendMs; + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 7d24c6f..48e4919 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -61,4 +61,10 @@ public class CommonClientConfigs { public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms"; public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config."; + + public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms"; + public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the client will wait " + + "for the response of a request. If the response is not received before the timeout " + + "elapses the client will resend the request if necessary or fail the request if " + + "retries are exhausted."; } diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index 15d00d4..64f7bf3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -16,7 +16,13 @@ import java.util.ArrayDeque; import java.util.Collections; import java.util.Deque; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; + /** * The set of requests which have been sent or are being sent but haven't yet received a response @@ -33,12 +39,13 @@ final class InFlightRequests { /** * Add the given request to the queue for the connection it was directed to */ - public void add(ClientRequest request) { + public void add(ClientRequest request, long now) { Deque reqs = this.requests.get(request.request().destination()); if (reqs == null) { reqs = new ArrayDeque(); this.requests.put(request.request().destination(), reqs); } + request.setSendMs(now); reqs.addFirst(request); } @@ -123,4 +130,27 @@ final class InFlightRequests { } } + /** + * Returns a list of nodes, that have pending inflight request, that can be timed out + * + * @param now current time in milliseconds + * @param requestTimeout max time to wait for the request to be completed + * @return list of nodes + */ + public Iterable getNodesWithTimedOutRequests(long now, int requestTimeout) { + List nodeIds = new LinkedList(); + if (requests.size() > 0) { + for (String nodeId : requests.keySet()) { + if (inFlightRequestCount(nodeId) > 0) { + ClientRequest request = lastSent(nodeId); + long timeSinceSend = now - request.getSendMs(); + if (timeSinceSend > requestTimeout) { + nodeIds.add(nodeId); + } + } + } + } + + return nodeIds; + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 7ab2503..a07e84e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -69,7 +69,7 @@ public interface KafkaClient extends Closeable { * * @param request The request */ - public void send(ClientRequest request); + public void send(ClientRequest request, long now); /** * Do actual reads and writes from sockets. diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index b31f7f1..0fa6335 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -79,13 +79,17 @@ public class NetworkClient implements KafkaClient { /* the last timestamp when no broker node is available to connect */ private long lastNoNodeAvailableMs; + /* max time in ms for the producer to wait for acknowledgement from server*/ + private final int requestTimeout; + public NetworkClient(Selectable selector, Metadata metadata, String clientId, int maxInFlightRequestsPerConnection, long reconnectBackoffMs, int socketSendBuffer, - int socketReceiveBuffer) { + int socketReceiveBuffer, + int requestTimeoutMs) { this.selector = selector; this.metadata = metadata; this.clientId = clientId; @@ -97,6 +101,7 @@ public class NetworkClient implements KafkaClient { this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE); this.metadataFetchInProgress = false; this.lastNoNodeAvailableMs = 0; + this.requestTimeout = requestTimeoutMs; } /** @@ -189,12 +194,11 @@ public class NetworkClient implements KafkaClient { * @param request The request */ @Override - public void send(ClientRequest request) { + public void send(ClientRequest request, long now) { String nodeId = request.request().destination(); if (!canSendRequest(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); - - this.inFlightRequests.add(request); + this.inFlightRequests.add(request, now); selector.send(request.request()); } @@ -227,6 +231,7 @@ public class NetworkClient implements KafkaClient { List responses = new ArrayList(); handleCompletedSends(responses, now); handleCompletedReceives(responses, now); + handleTimedOutRequests(now); handleDisconnections(responses, now); handleConnections(); @@ -348,6 +353,18 @@ public class NetworkClient implements KafkaClient { } /** + * Iterate over all the inflight requests and expire any requests that have exceeded the configured the requestTimeout. + * The connection to the node associated with the request will be terminated and will be treated as a disconnection. It + * will be handled in handleDisconnections() subsequently. + */ + private void handleTimedOutRequests(long now) { + for (String nodeId : this.inFlightRequests.getNodesWithTimedOutRequests(/*currTime,*/now, this.requestTimeout)) { + // disconnect the connection to the node + this.selector.disconnect(nodeId); + } + } + + /** * Handle any completed request send. In particular if no response is expected consider the request complete. * * @param responses The list of responses to update @@ -478,7 +495,7 @@ public class NetworkClient implements KafkaClient { ClientRequest metadataRequest = metadataRequest(now, nodeConnectionId, topics); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); this.selector.send(metadataRequest.request()); - this.inFlightRequests.add(metadataRequest); + this.inFlightRequests.add(metadataRequest, now); } else if (connectionStates.canConnect(nodeConnectionId, now)) { // we don't have a connection to this node right now, make one log.debug("Initialize connection to node {} for sending metadata request", node.id()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index b9a2d4e..858fef6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -159,6 +159,10 @@ public class ConsumerConfig extends AbstractConfig { /** connections.max.idle.ms */ public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; + /** request.timeout.ms */ + public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; + private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC ; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, @@ -298,6 +302,12 @@ public class ConsumerConfig extends AbstractConfig { .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false) + .define(REQUEST_TIMEOUT_MS_CONFIG, + Type.INT, + 10 * 1000, + atLeast(0), + Importance.MEDIUM, + REQUEST_TIMEOUT_MS_DOC) /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 938981c..8fef2bb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -527,7 +527,8 @@ public class KafkaConsumer implements Consumer { 100, // a fixed large enough value will suffice config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), - config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG)); + config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG), + config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs); OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase()); this.subscriptions = new SubscriptionState(offsetResetStrategy); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 804d569..ae78331 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -128,7 +128,6 @@ public class KafkaProducer implements Producer { private String clientId; private final Partitioner partitioner; private final int maxRequestSize; - private final long metadataFetchTimeoutMs; private final long totalMemorySize; private final Metadata metadata; private final RecordAccumulator accumulator; @@ -141,6 +140,8 @@ public class KafkaProducer implements Producer { private final Serializer keySerializer; private final Serializer valueSerializer; private final ProducerConfig producerConfig; + private final long maxBlockTimeMs; + private final int requestTimeoutMs; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -197,8 +198,10 @@ public class KafkaProducer implements Producer { private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer) { try { log.trace("Starting the Kafka producer"); + Map userProvidedConfigs = config.originals(); this.producerConfig = config; this.time = new SystemTime(); + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); @@ -211,11 +214,45 @@ public class KafkaProducer implements Producer { this.metrics = new Metrics(metricConfig, reporters, time); this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); - this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + /* check for user defined settings. + * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG. + * This should be removed with release 0.9 + */ + if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) { + log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " + + "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); + boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG); + if (blockOnBufferFull) { + this.maxBlockTimeMs = Long.MAX_VALUE; + } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { + log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + + "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); + this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); + } else { + this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); + } + } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { + log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + + "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); + this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); + } else { + this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); + } + + /* check for user defined settings. + * If the TIME_OUT config is set use that for request timeout. + * This should be removed with release 0.9 + */ + if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) { + this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG); + } else { + this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); + } + Map metricTags = new LinkedHashMap(); metricTags.put("client-id", clientId); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), @@ -223,7 +260,6 @@ public class KafkaProducer implements Producer { this.compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, - config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, time, metricTags); @@ -237,17 +273,18 @@ public class KafkaProducer implements Producer { config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), - config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG)); + config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), + this.requestTimeoutMs); this.sender = new Sender(client, this.metadata, this.accumulator, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), - config.getInt(ProducerConfig.TIMEOUT_CONFIG), this.metrics, new SystemTime(), - clientId); + clientId, + this.requestTimeoutMs); String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start(); @@ -367,7 +404,8 @@ public class KafkaProducer implements Producer { public Future send(ProducerRecord record, Callback callback) { try { // first make sure the metadata for the topic is available - waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); + long startTime = time.milliseconds(); + waitOnMetadata(record.topic(), this.maxBlockTimeMs); byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.key()); @@ -389,7 +427,13 @@ public class KafkaProducer implements Producer { ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback); + // check remaining blocking time + long elapsedTime = time.milliseconds() - startTime; + if (elapsedTime > maxBlockTimeMs) { + throw new TimeoutException("Request timed out"); + } + long remainingTime = maxBlockTimeMs - elapsedTime; + RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingTime); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); @@ -508,7 +552,7 @@ public class KafkaProducer implements Producer { @Override public List partitionsFor(String topic) { try { - waitOnMetadata(topic, this.metadataFetchTimeoutMs); + waitOnMetadata(topic, this.maxBlockTimeMs); } catch (InterruptedException e) { throw new InterruptException(e); } @@ -628,7 +672,8 @@ public class KafkaProducer implements Producer { + "]."); return partition; } - return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); + return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, + cluster); } private static class FutureFailure implements Future { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 06f00a9..d9e0c81 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -45,6 +45,9 @@ public class ProducerConfig extends AbstractConfig { public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; /** metadata.fetch.timeout.ms */ + /** + * @deprecated This config will be removed soon. Please use {@link #MAX_BLOCK_MS_CONFIG} + */ public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions. This " + "fetch to succeed before throwing an exception back to the client."; @@ -93,6 +96,10 @@ public class ProducerConfig extends AbstractConfig { + " remains alive. This is the strongest available guarantee."; /** timeout.ms */ + /** + *@deprecated This config will be removed soon. Please use {@link #REQUEST_TIMEOUT_MS_CONFIG} + */ + @Deprecated public static final String TIMEOUT_CONFIG = "timeout.ms"; private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to " + "meet the acknowledgment requirements the producer has specified with the acks configuration. If the " + "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout " @@ -128,6 +135,11 @@ public class ProducerConfig extends AbstractConfig { public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG; /** block.on.buffer.full */ + /** + *@deprecated This config will be removed soon. Currently, if this property is set to true, the METADATA_FETCH_TIMEOUT_CONFIG will not be honored. + *Please use {@link #MAX_BLOCK_MS_CONFIG}. + */ + @Deprecated public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full"; private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default " + "this setting is true and we block, however in some scenarios blocking is not desirable and it is better to " + "immediately give an error. Setting this to false will accomplish that: the producer will throw a BufferExhaustedException if a recrord is sent and the buffer space is full."; @@ -177,6 +189,16 @@ public class ProducerConfig extends AbstractConfig { public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the Partitioner interface."; + /** max.block.ms */ + public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms"; + private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long {@link KafkaProducer#send()} will block." + + "The send method can be blocked for multiple reasons. " + + "For e.g: buffer full, metadata unavailable etc."; + + /** request.timeout.ms */ + public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; + private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC ; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) @@ -200,7 +222,7 @@ public class ProducerConfig extends AbstractConfig { atLeast(0), Importance.MEDIUM, MAX_REQUEST_SIZE_DOC) - .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) + .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, false, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC) @@ -210,6 +232,18 @@ public class ProducerConfig extends AbstractConfig { atLeast(0), Importance.LOW, METADATA_FETCH_TIMEOUT_DOC) + .define(MAX_BLOCK_MS_CONFIG, + Type.LONG, + 60 * 1000, + atLeast(0), + Importance.MEDIUM, + MAX_BLOCK_MS_DOC) + .define(REQUEST_TIMEOUT_MS_CONFIG, + Type.INT, + 10 * 1000, + atLeast(0), + Importance.MEDIUM, + REQUEST_TIMEOUT_MS_DOC) .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC) .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 4cb1e50..09e44fb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -24,8 +24,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Rate; @@ -46,7 +46,6 @@ public final class BufferPool { private final long totalMemory; private final int poolableSize; - private final boolean blockOnExhaustion; private final ReentrantLock lock; private final Deque free; private final Deque waiters; @@ -60,17 +59,13 @@ public final class BufferPool { * * @param memory The maximum amount of memory that this buffer pool can allocate * @param poolableSize The buffer size to cache in the free list rather than deallocating - * @param blockOnExhaustion This controls the behavior when the buffer pool is out of memory. If true the - * {@link #allocate(int)} call will block and wait for memory to be returned to the pool. If false - * {@link #allocate(int)} will throw an exception if the buffer is out of memory. * @param metrics instance of Metrics * @param time time instance * @param metricGrpName logical group name for metrics * @param metricTags additional key/val attributes for metrics */ - public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion, Metrics metrics, Time time , String metricGrpName , Map metricTags) { + public BufferPool(long memory, int poolableSize, Metrics metrics, Time time , String metricGrpName , Map metricTags) { this.poolableSize = poolableSize; - this.blockOnExhaustion = blockOnExhaustion; this.lock = new ReentrantLock(); this.free = new ArrayDeque(); this.waiters = new ArrayDeque(); @@ -95,9 +90,8 @@ public final class BufferPool { * @throws InterruptedException If the thread is interrupted while blocked * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block * forever) - * @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool */ - public ByteBuffer allocate(int size) throws InterruptedException { + public ByteBuffer allocate(int size, long maxTimeToBlock) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " @@ -120,10 +114,6 @@ public final class BufferPool { this.availableMemory -= size; lock.unlock(); return ByteBuffer.allocate(size); - } else if (!blockOnExhaustion) { - throw new BufferExhaustedException("You have exhausted the " + this.totalMemory - + " bytes of memory you configured for the client and the client is configured to error" - + " rather than block when memory is exhausted."); } else { // we are out of memory and will have to block int accumulated = 0; @@ -134,7 +124,9 @@ public final class BufferPool { // enough memory to allocate one while (accumulated < size) { long startWait = time.nanoseconds(); - moreMemory.await(); + if (!moreMemory.await(maxTimeToBlock, TimeUnit.MILLISECONDS)) { + throw new TimeoutException("Failed to allocate memory within the configured max blocking time"); + } long endWait = time.nanoseconds(); this.waitTime.record(endWait - startWait, time.milliseconds()); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index a152bd7..7d8cab4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -12,12 +12,14 @@ */ package org.apache.kafka.clients.producer.internals; +import java.util.Iterator; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -82,8 +84,6 @@ public final class RecordAccumulator { * latency for potentially better throughput due to more batching (and hence fewer, larger requests). * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids * exhausting all retries in a short period of time. - * @param blockOnBufferFull If true block when we are out of memory; if false throw an exception when we are out of - * memory * @param metrics The metrics * @param time The time instance to use * @param metricTags additional key/value attributes of the metric @@ -93,7 +93,6 @@ public final class RecordAccumulator { CompressionType compression, long lingerMs, long retryBackoffMs, - boolean blockOnBufferFull, Metrics metrics, Time time, Map metricTags) { @@ -107,7 +106,7 @@ public final class RecordAccumulator { this.retryBackoffMs = retryBackoffMs; this.batches = new CopyOnWriteMap>(); String metricGrpName = "producer-metrics"; - this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags); + this.free = new BufferPool(totalSize, batchSize, metrics, time , metricGrpName , metricTags); this.incomplete = new IncompleteRecordBatches(); this.time = time; registerMetrics(metrics, metricGrpName, metricTags); @@ -154,7 +153,7 @@ public final class RecordAccumulator { * @param value The value for the record * @param callback The user-supplied callback to execute when the request is complete */ - public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { + public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); @@ -166,23 +165,23 @@ public final class RecordAccumulator { synchronized (dq) { RecordBatch last = dq.peekLast(); if (last != null) { - FutureRecordMetadata future = last.tryAppend(key, value, callback); - if (future != null) - return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); + FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); + if (future != null) + return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } // we don't have an in-progress record batch try to allocate a new batch int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); - ByteBuffer buffer = free.allocate(size); + ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordBatch last = dq.peekLast(); if (last != null) { - FutureRecordMetadata future = last.tryAppend(key, value, callback); + FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); @@ -191,7 +190,7 @@ public final class RecordAccumulator { } MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); - FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback)); + FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); @@ -203,6 +202,41 @@ public final class RecordAccumulator { } /** + * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout + * due to metadata being unavailable + */ + public List abortExpiredBatches(int requestTimeout, Cluster cluster, long now) { + List expiredBatches = new ArrayList(); + int count = 0; + for (Map.Entry> entry : this.batches.entrySet()) { + TopicPartition topicAndPartition = entry.getKey(); + Deque dq = entry.getValue(); + synchronized (dq) { + // iterate over the batches and expire them if they have stayed in accumulator for more than requestiTimeOut + Iterator batchIterator = dq.iterator(); + while (batchIterator.hasNext()) { + RecordBatch batch = batchIterator.next(); + Node leader = cluster.leaderFor(topicAndPartition); + if (batch != null && (leader == null)) { + // check if the batch is expired + if (batch.expire(requestTimeout, now)) { + expiredBatches.add(batch); + count++; + dq.remove(batch); + deallocate(batch); + } + } + } + } + } + if (expiredBatches.size() > 0) { + log.trace("Expired {} batches in accumulator", count); + } + + return expiredBatches; + } + + /** * Re-enqueue the given record batch in the accumulator to retry */ public void reenqueue(RecordBatch batch, long now) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 06182db..640a261 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -18,8 +18,11 @@ import java.util.List; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +45,7 @@ public final class RecordBatch { public final TopicPartition topicPartition; public final ProduceRequestResult produceFuture; private final List thunks; + private long lastAppendTime; public RecordBatch(TopicPartition tp, MemoryRecords records, long now) { this.createdMs = now; @@ -50,6 +54,7 @@ public final class RecordBatch { this.topicPartition = tp; this.produceFuture = new ProduceRequestResult(); this.thunks = new ArrayList(); + this.lastAppendTime = createdMs; } /** @@ -57,12 +62,13 @@ public final class RecordBatch { * * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ - public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) { + public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback, long now) { if (!this.records.hasRoomFor(key, value)) { return null; } else { this.records.append(0L, key, value); this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); + this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) thunks.add(new Thunk(callback, future)); @@ -116,4 +122,24 @@ public final class RecordBatch { public String toString() { return "RecordBatch(topicPartition=" + topicPartition + ", recordCount=" + recordCount + ")"; } + + /** + * Expire the batch that is ready but is sitting in accumulator for more than + * requestTimeout due to metadata being unava + * + * + * + * + * lable. + */ + public boolean expire(int requestTimeout, long now) { + boolean expire = false; + if (requestTimeout < (now - this.lastAppendTime)) { + expire = true; + this.records.close(); + this.done(-1L, new TimeoutException("Batch Expired")); + } + + return expire; + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index d2e64f7..094605f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -71,9 +71,6 @@ public class Sender implements Runnable { /* the number of acknowledgements to request from the server */ private final short acks; - /* the max time in ms for the server to wait for acknowlegements */ - private final int requestTimeout; - /* the number of times to retry a failed request before giving up */ private final int retries; @@ -92,27 +89,30 @@ public class Sender implements Runnable { /* param clientId of the client */ private String clientId; + /* the max time to wait for the server to respond to the request*/ + private final int requestTimeout; + public Sender(KafkaClient client, Metadata metadata, RecordAccumulator accumulator, int maxRequestSize, short acks, int retries, - int requestTimeout, Metrics metrics, Time time, - String clientId) { + String clientId, + int requestTimeout) { this.client = client; this.accumulator = accumulator; this.metadata = metadata; this.maxRequestSize = maxRequestSize; this.running = true; - this.requestTimeout = requestTimeout; this.acks = acks; this.retries = retries; this.time = time; this.clientId = clientId; this.sensors = new SenderMetrics(metrics); + this.requestTimeout = requestTimeout; } /** @@ -187,6 +187,13 @@ public class Sender implements Runnable { result.readyNodes, this.maxRequestSize, now); + + List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now); + // update sensors + for (RecordBatch expiredBatch : expiredBatches) { + this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); + } + sensors.updateProduceRequestMetrics(batches); List requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately @@ -200,7 +207,7 @@ public class Sender implements Runnable { pollTimeout = 0; } for (ClientRequest request : requests) - client.send(request); + client.send(request, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index f49d54c..eec738a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -86,6 +86,7 @@ public class Selector implements Selectable { private final long connectionsMaxIdleNanos; private final int maxReceiveSize; private final boolean metricsPerConnection; + private final List clientDisconnects; private long currentTimeNanos; private long nextIdleCloseCheckTime; @@ -118,6 +119,7 @@ public class Selector implements Selectable { currentTimeNanos = new SystemTime().nanoseconds(); nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; this.metricsPerConnection = metricsPerConnection; + this.clientDisconnects = new LinkedList(); } public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map metricTags, ChannelBuilder channelBuilder) { @@ -183,8 +185,10 @@ public class Selector implements Selectable { @Override public void disconnect(String id) { KafkaChannel channel = channelForId(id); - if (channel != null) - channel.disconnect(); + if (channel != null) { + close(channel); + this.clientDisconnects.add(channel.id()); + } } /** @@ -421,7 +425,9 @@ public class Selector implements Selectable { this.connected.clear(); this.disconnected.clear(); this.disconnected.addAll(this.failedSends); + this.disconnected.addAll(this.clientDisconnects); this.failedSends.clear(); + this.clientDisconnects.clear(); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 9133d85..7edf507 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -100,7 +100,7 @@ public class MockClient implements KafkaClient { } @Override - public void send(ClientRequest request) { + public void send(ClientRequest request, long now) { if (!futureResponses.isEmpty()) { FutureResponse futureResp = futureResponses.poll(); if (!futureResp.requestMatcher.matches(request)) diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 43238ce..a3fcce2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -45,13 +45,14 @@ import org.junit.Test; public class NetworkClientTest { + private final int REQUEST_TIMEOUT = 1000; private MockTime time = new MockTime(); private MockSelector selector = new MockSelector(time); private Metadata metadata = new Metadata(0, Long.MAX_VALUE); private int nodeId = 1; private Cluster cluster = TestUtils.singletonCluster("test", nodeId); private Node node = cluster.nodes().get(0); - private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024); + private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, REQUEST_TIMEOUT); @Before public void setup() { @@ -68,7 +69,8 @@ public class NetworkClientTest { selector.disconnect(node.idString()); client.poll(1, time.milliseconds()); selector.clear(); - assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds())); + assertFalse("After we forced the disconnection the client is no longer ready.", + client.ready(node, time.milliseconds())); assertTrue("Metadata should get updated.", metadata.timeToNextUpdate(time.milliseconds()) == 0); } @@ -78,7 +80,7 @@ public class NetworkClientTest { client.nextRequestHeader(ApiKeys.METADATA), new MetadataRequest(Arrays.asList("test")).toStruct()); ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null); - client.send(request); + client.send(request, time.milliseconds()); client.poll(1, time.milliseconds()); } @@ -90,7 +92,7 @@ public class NetworkClientTest { TestCallbackHandler handler = new TestCallbackHandler(); ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler); awaitReady(client, node); - client.send(request); + client.send(request, time.milliseconds()); client.poll(1, time.milliseconds()); assertEquals(1, client.inFlightRequestCount()); ResponseHeader respHeader = new ResponseHeader(reqHeader.correlationId()); @@ -113,8 +115,25 @@ public class NetworkClientTest { while (!client.ready(node, time.milliseconds())) client.poll(1, time.milliseconds()); } - - private static class TestCallbackHandler implements RequestCompletionHandler { + + @Test + public void testRequestTimeout() { + ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.emptyMap()); + RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE); + RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct()); + TestCallbackHandler handler = new TestCallbackHandler(); + ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler); + awaitReady(client, node); + long now = time.milliseconds(); + client.send(request, now); + // sleeping to make sure that the time since last send is greater than requestTimeOut + time.sleep(3000); + client.poll(3000, time.milliseconds()); + String disconnectedNode = selector.disconnected().get(0); + assertEquals(node.idString(), disconnectedNode); + } + + private static class TestCallbackHandler implements RequestCompletionHandler { public boolean executed = false; public ClientResponse response; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index 2c69382..883b563 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; -import org.apache.kafka.clients.producer.BufferExhaustedException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestUtils; @@ -35,6 +35,7 @@ import static org.junit.Assert.*; public class BufferPoolTest { private MockTime time = new MockTime(); private Metrics metrics = new Metrics(time); + private final long MAX_BLOCK_TIME = 2000; String metricGroup = "TestMetrics"; Map metricTags = new LinkedHashMap(); @@ -45,8 +46,8 @@ public class BufferPoolTest { public void testSimple() throws Exception { long totalMemory = 64 * 1024; int size = 1024; - BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags); - ByteBuffer buffer = pool.allocate(size); + BufferPool pool = new BufferPool(totalMemory, size, metrics, time, metricGroup, metricTags); + ByteBuffer buffer = pool.allocate(size, MAX_BLOCK_TIME); assertEquals("Buffer size should equal requested size.", size, buffer.limit()); assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory()); assertEquals("Available memory should have shrunk", totalMemory - size, pool.availableMemory()); @@ -55,13 +56,13 @@ public class BufferPoolTest { pool.deallocate(buffer); assertEquals("All memory should be available", totalMemory, pool.availableMemory()); assertEquals("But now some is on the free list", totalMemory - size, pool.unallocatedMemory()); - buffer = pool.allocate(size); + buffer = pool.allocate(size, MAX_BLOCK_TIME); assertEquals("Recycled buffer should be cleared.", 0, buffer.position()); assertEquals("Recycled buffer should be cleared.", buffer.capacity(), buffer.limit()); pool.deallocate(buffer); assertEquals("All memory should be available", totalMemory, pool.availableMemory()); assertEquals("Still a single buffer on the free list", totalMemory - size, pool.unallocatedMemory()); - buffer = pool.allocate(2 * size); + buffer = pool.allocate(2 * size, MAX_BLOCK_TIME); pool.deallocate(buffer); assertEquals("All memory should be available", totalMemory, pool.availableMemory()); assertEquals("Non-standard size didn't go to the free list.", totalMemory - size, pool.unallocatedMemory()); @@ -72,23 +73,11 @@ public class BufferPoolTest { */ @Test(expected = IllegalArgumentException.class) public void testCantAllocateMoreMemoryThanWeHave() throws Exception { - BufferPool pool = new BufferPool(1024, 512, true, metrics, time, metricGroup, metricTags); - ByteBuffer buffer = pool.allocate(1024); + BufferPool pool = new BufferPool(1024, 512, metrics, time, metricGroup, metricTags); + ByteBuffer buffer = pool.allocate(1024, MAX_BLOCK_TIME); assertEquals(1024, buffer.limit()); pool.deallocate(buffer); - buffer = pool.allocate(1025); - } - - @Test - public void testNonblockingMode() throws Exception { - BufferPool pool = new BufferPool(2, 1, false, metrics, time, metricGroup, metricTags); - pool.allocate(1); - try { - pool.allocate(2); - fail("The buffer allocated more than it has!"); - } catch (BufferExhaustedException e) { - // this is good - } + buffer = pool.allocate(1025, MAX_BLOCK_TIME); } /** @@ -96,8 +85,8 @@ public class BufferPoolTest { */ @Test public void testDelayedAllocation() throws Exception { - BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags); - ByteBuffer buffer = pool.allocate(1024); + BufferPool pool = new BufferPool(5 * 1024, 1024, metrics, time, metricGroup, metricTags); + ByteBuffer buffer = pool.allocate(1024, MAX_BLOCK_TIME); CountDownLatch doDealloc = asyncDeallocate(pool, buffer); CountDownLatch allocation = asyncAllocate(pool, 5 * 1024); assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, allocation.getCount()); @@ -126,7 +115,7 @@ public class BufferPoolTest { Thread thread = new Thread() { public void run() { try { - pool.allocate(size); + pool.allocate(size, MAX_BLOCK_TIME); } catch (InterruptedException e) { e.printStackTrace(); } finally { @@ -139,6 +128,23 @@ public class BufferPoolTest { } /** + * Test if Timeout exception is thrown when there is not enough memory to allocate and the elapsed time is greater than the max specified block time + * + * @throws Exception + */ + @Test + public void testBlockTimeout() throws Exception { + BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup, metricTags); + pool.allocate(1, MAX_BLOCK_TIME); + try { + pool.allocate(2, MAX_BLOCK_TIME); + fail("The buffer allocated more memory than its maximum value 2"); + } catch (TimeoutException e) { + // this is good + } + } + + /** * This test creates lots of threads that hammer on the pool */ @Test @@ -147,7 +153,7 @@ public class BufferPoolTest { final int iterations = 50000; final int poolableSize = 1024; final long totalMemory = numThreads / 2 * poolableSize; - final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags); + final BufferPool pool = new BufferPool(totalMemory, poolableSize, metrics, time, metricGroup, metricTags); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) threads.add(new StressTestThread(pool, iterations)); @@ -163,6 +169,7 @@ public class BufferPoolTest { public static class StressTestThread extends Thread { private final int iterations; private final BufferPool pool; + private final long MAX_BLOCK_TIME_MS = 2000; public final AtomicBoolean success = new AtomicBoolean(false); public StressTestThread(BufferPool pool, int iterations) { @@ -180,7 +187,7 @@ public class BufferPoolTest { else // allocate a random size size = TestUtils.RANDOM.nextInt((int) pool.totalMemory()); - ByteBuffer buffer = pool.allocate(size); + ByteBuffer buffer = pool.allocate(size, MAX_BLOCK_TIME_MS); pool.deallocate(buffer); } success.set(true); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 5b2e4ff..e24f972 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -40,6 +41,8 @@ import org.apache.kafka.common.record.LogEntry; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; import org.junit.Test; public class RecordAccumulatorTest { @@ -63,17 +66,18 @@ public class RecordAccumulatorTest { private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); private Metrics metrics = new Metrics(time); Map metricTags = new LinkedHashMap(); + private final long MAX_BLOCK_TIME = 1000; @Test public void testFull() throws Exception { long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time, metricTags); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp1, key, value, null); + accum.append(tp1, key, value, null, MAX_BLOCK_TIME); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } - accum.append(tp1, key, value, null); + accum.append(tp1, key, value, null, MAX_BLOCK_TIME); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); @@ -91,16 +95,16 @@ public class RecordAccumulatorTest { @Test public void testAppendLarge() throws Exception { int batchSize = 512; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, new byte[2 * batchSize], null); + RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time, metricTags); + accum.append(tp1, key, new byte[2 * batchSize], null, MAX_BLOCK_TIME); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @Test public void testLinger() throws Exception { long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, value, null); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags); + accum.append(tp1, key, value, null, MAX_BLOCK_TIME); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -117,12 +121,12 @@ public class RecordAccumulatorTest { @Test public void testPartialDrain() throws Exception { - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time, metricTags); int appends = 1024 / msgSize + 1; List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) - accum.append(tp, key, value, null); + accum.append(tp, key, value, null, MAX_BLOCK_TIME); } assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -136,14 +140,14 @@ public class RecordAccumulatorTest { final int numThreads = 5; final int msgs = 10000; final int numParts = 2; - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, true, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time, metricTags); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition(topic, i % numParts), key, value, null); + accum.append(new TopicPartition(topic, i % numParts), key, value, null, MAX_BLOCK_TIME); } catch (Exception e) { e.printStackTrace(); } @@ -177,13 +181,13 @@ public class RecordAccumulatorTest { public void testNextReadyCheckDelay() throws Exception { // Next check time will use lingerMs since this test won't trigger any retries/backoff long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags); // Just short of going over the limit so we trigger linger time int appends = 1024 / msgSize; // Partition on node1 only for (int i = 0; i < appends; i++) - accum.append(tp1, key, value, null); + accum.append(tp1, key, value, null, MAX_BLOCK_TIME); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); @@ -192,14 +196,14 @@ public class RecordAccumulatorTest { // Add partition on node2 only for (int i = 0; i < appends; i++) - accum.append(tp3, key, value, null); + accum.append(tp3, key, value, null, MAX_BLOCK_TIME); result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) - accum.append(tp2, key, value, null); + accum.append(tp2, key, value, null, MAX_BLOCK_TIME); result = accum.ready(cluster, time.milliseconds()); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); // Note this can actually be < linger time because it may use delays from partitions that aren't sendable @@ -211,10 +215,10 @@ public class RecordAccumulatorTest { public void testRetryBackoff() throws Exception { long lingerMs = Long.MAX_VALUE / 4; long retryBackoffMs = Long.MAX_VALUE / 2; - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, metricTags); long now = time.milliseconds(); - accum.append(tp1, key, value, null); + accum.append(tp1, key, value, null, MAX_BLOCK_TIME); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); Map> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); @@ -226,7 +230,7 @@ public class RecordAccumulatorTest { accum.reenqueue(batches.get(0).get(0), now); // Put message for partition 1 into accumulator - accum.append(tp2, key, value, null); + accum.append(tp2, key, value, null, MAX_BLOCK_TIME); result = accum.ready(cluster, now + lingerMs + 1); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); @@ -248,9 +252,9 @@ public class RecordAccumulatorTest { @Test public void testFlush() throws Exception { long lingerMs = Long.MAX_VALUE; - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags); for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, null); + accum.append(new TopicPartition(topic, i % 3), key, value, null, MAX_BLOCK_TIME); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); @@ -272,7 +276,7 @@ public class RecordAccumulatorTest { public void testAbortIncompleteBatches() throws Exception { long lingerMs = Long.MAX_VALUE; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags); class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { @@ -281,7 +285,7 @@ public class RecordAccumulatorTest { } } for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback()); + accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback(), MAX_BLOCK_TIME); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); @@ -291,4 +295,33 @@ public class RecordAccumulatorTest { } + @Test + public void testExpiredBatches() throws InterruptedException { + Time time = new SystemTime(); + long now = time.milliseconds(); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time, metricTags); + int appends = 1024 / msgSize; + for (int i = 0; i < appends; i++) { + accum.append(tp1, key, value, null, MAX_BLOCK_TIME); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); + } + time.sleep(2000); + accum.append(tp1, key, value, null, 0); + Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); + Cluster cluster = new Cluster(Arrays.asList(), Arrays.asList()); + now = time.milliseconds(); + List batches = accum.abortExpiredBatches(0, cluster, now); + assertEquals(1, batches.size()); + RecordBatch batch = batches.get(0); + batch.records.close(); + batch.records.flip(); + Iterator iter = batch.records.iterator(); + for (int i = 0; i < appends; i++) { + LogEntry entry = iter.next(); + assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key()); + assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value()); + } + assertFalse("No more records", iter.hasNext()); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index aa44991..bcf6a3a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -44,10 +44,11 @@ public class SenderTest { private static final int MAX_REQUEST_SIZE = 1024 * 1024; private static final short ACKS_ALL = -1; private static final int MAX_RETRIES = 0; - private static final int REQUEST_TIMEOUT_MS = 10000; private static final String CLIENT_ID = "clientId"; private static final String METRIC_GROUP = "producer-metrics"; private static final double EPS = 0.0001; + private static final int MAX_BLOCK_TIMEOUT = 1000; + private static final int REQUEST_TIMEOUT = 1000; private TopicPartition tp = new TopicPartition("test", 0); private MockTime time = new MockTime(); @@ -57,17 +58,17 @@ public class SenderTest { private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = new Metrics(time); Map metricTags = new LinkedHashMap(); - private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, false, metrics, time, metricTags); + private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, metricTags); private Sender sender = new Sender(client, metadata, this.accumulator, MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, - REQUEST_TIMEOUT_MS, metrics, time, - CLIENT_ID); + CLIENT_ID, + REQUEST_TIMEOUT); @Before public void setup() { @@ -78,7 +79,7 @@ public class SenderTest { @Test public void testSimple() throws Exception { long offset = 0; - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); @@ -97,7 +98,7 @@ public class SenderTest { public void testQuotaMetrics() throws Exception { final long offset = 0; for (int i = 1; i <= 3; i++) { - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // send produce request client.respond(produceResponse(tp, offset, Errors.NONE.code(), 100 * i)); sender.run(time.milliseconds()); @@ -119,12 +120,12 @@ public class SenderTest { MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, - REQUEST_TIMEOUT_MS, new Metrics(), time, - "clientId"); + "clientId", + REQUEST_TIMEOUT); // do a successful retry - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals(1, client.inFlightRequestCount()); @@ -141,7 +142,7 @@ public class SenderTest { assertEquals(offset, future.get().offset()); // do an unsuccessful retry - future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // send produce request for (int i = 0; i < maxRetries + 1; i++) { client.disconnect(client.requests().peek().request().destination()); -- 1.7.12.4 From 485e4201f133a46d048a80f11fe846e9e6039ec8 Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Mon, 27 Jul 2015 15:30:28 -0700 Subject: [PATCH 2/7] Solved compile error --- .../apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 9517d9d..0b611fb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -256,7 +256,7 @@ public class ConsumerNetworkClient implements Closeable { while (iterator.hasNext()) { ClientRequest request = iterator.next(); if (client.ready(node, now)) { - client.send(request); + client.send(request, now); iterator.remove(); requestsSent = true; } else if (client.connectionFailed(node)) { -- 1.7.12.4 From f3fa743c2e88b0f93c755bbc758258d035a7a4c5 Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Wed, 29 Jul 2015 15:52:49 -0700 Subject: [PATCH 3/7] Addressed Jason's comments for Kip-19 --- clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java | 2 +- clients/src/main/java/org/apache/kafka/clients/KafkaClient.java | 1 + clients/src/main/java/org/apache/kafka/clients/NetworkClient.java | 1 + clients/src/test/java/org/apache/kafka/clients/MockClient.java | 1 + 4 files changed, 4 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index 64f7bf3..133d197 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -139,7 +139,7 @@ final class InFlightRequests { */ public Iterable getNodesWithTimedOutRequests(long now, int requestTimeout) { List nodeIds = new LinkedList(); - if (requests.size() > 0) { + if (!requests.isEmpty()) { for (String nodeId : requests.keySet()) { if (inFlightRequestCount(nodeId) > 0) { ClientRequest request = lastSent(nodeId); diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index a07e84e..3201cd3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -68,6 +68,7 @@ public interface KafkaClient extends Closeable { * Queue up the given request for sending. Requests can only be sent on ready connections. * * @param request The request + * @param now The current timestamp */ public void send(ClientRequest request, long now); diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 0fa6335..4b08750 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -192,6 +192,7 @@ public class NetworkClient implements KafkaClient { * Queue up the given request for sending. Requests can only be sent out to ready nodes. * * @param request The request + * @param now The current timestamp */ @Override public void send(ClientRequest request, long now) { diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 7edf507..3eac546 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -109,6 +109,7 @@ public class MockClient implements KafkaClient { ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody); responses.add(resp); } else { + request.setSendMs(now); this.requests.add(request); } } -- 1.7.12.4 From 5a62d8320437fd6c46cbd3e41bddbe4355b69134 Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Mon, 10 Aug 2015 19:35:27 -0700 Subject: [PATCH 4/7] Addressed Jun's comments --- .../org/apache/kafka/clients/InFlightRequests.java | 4 -- .../org/apache/kafka/clients/NetworkClient.java | 6 +-- .../kafka/clients/consumer/ConsumerConfig.java | 2 +- .../kafka/clients/producer/KafkaProducer.java | 38 +++++++++--------- .../kafka/clients/producer/ProducerConfig.java | 3 +- .../clients/producer/internals/BufferPool.java | 5 +-- .../producer/internals/RecordAccumulator.java | 14 +++---- .../clients/producer/internals/RecordBatch.java | 13 ++---- .../kafka/clients/producer/internals/Sender.java | 5 +-- .../apache/kafka/clients/NetworkClientTest.java | 9 ++--- .../clients/producer/internals/BufferPoolTest.java | 24 +++++------ .../producer/internals/RecordAccumulatorTest.java | 38 +++++++++--------- .../apache/kafka/common/network/SelectorTest.java | 6 ++- .../scala/kafka/tools/ProducerPerformance.scala | 1 - .../kafka/api/ProducerFailureHandlingTest.scala | 46 +--------------------- .../test/scala/unit/kafka/utils/TestUtils.scala | 1 - 16 files changed, 81 insertions(+), 134 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index 133d197..c919df5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -19,10 +19,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; -import org.apache.kafka.common.utils.SystemTime; -import org.apache.kafka.common.utils.Time; - /** * The set of requests which have been sent or are being sent but haven't yet received a response diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 4b08750..247b0d5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -80,7 +80,7 @@ public class NetworkClient implements KafkaClient { private long lastNoNodeAvailableMs; /* max time in ms for the producer to wait for acknowledgement from server*/ - private final int requestTimeout; + private final int requestTimeoutMs; public NetworkClient(Selectable selector, Metadata metadata, @@ -101,7 +101,7 @@ public class NetworkClient implements KafkaClient { this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE); this.metadataFetchInProgress = false; this.lastNoNodeAvailableMs = 0; - this.requestTimeout = requestTimeoutMs; + this.requestTimeoutMs = requestTimeoutMs; } /** @@ -359,7 +359,7 @@ public class NetworkClient implements KafkaClient { * will be handled in handleDisconnections() subsequently. */ private void handleTimedOutRequests(long now) { - for (String nodeId : this.inFlightRequests.getNodesWithTimedOutRequests(/*currTime,*/now, this.requestTimeout)) { + for (String nodeId : this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs)) { // disconnect the connection to the node this.selector.disconnect(nodeId); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 858fef6..a50f056 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -161,7 +161,7 @@ public class ConsumerConfig extends AbstractConfig { /** request.timeout.ms */ public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; - private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC ; + private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; static { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index ae78331..e141bcd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -223,24 +223,24 @@ public class KafkaProducer implements Producer { * This should be removed with release 0.9 */ if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) { - log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " + - "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); - boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG); - if (blockOnBufferFull) { - this.maxBlockTimeMs = Long.MAX_VALUE; - } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { + log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " + + "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); + boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG); + if (blockOnBufferFull) { + this.maxBlockTimeMs = Long.MAX_VALUE; + } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { + log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + + "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); + this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); + } else { + this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); + } + } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + - "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); + "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); - } else { - this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); - } - } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { - log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + - "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); - this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); } else { - this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); + this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); } /* check for user defined settings. @@ -248,9 +248,11 @@ public class KafkaProducer implements Producer { * This should be removed with release 0.9 */ if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) { - this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG); + log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " + + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); + this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG); } else { - this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); + this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); } Map metricTags = new LinkedHashMap(); @@ -430,7 +432,7 @@ public class KafkaProducer implements Producer { // check remaining blocking time long elapsedTime = time.milliseconds() - startTime; if (elapsedTime > maxBlockTimeMs) { - throw new TimeoutException("Request timed out"); + throw new TimeoutException("Request timed out"); } long remainingTime = maxBlockTimeMs - elapsedTime; RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingTime); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index d9e0c81..b961290 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -48,6 +48,7 @@ public class ProducerConfig extends AbstractConfig { /** * @deprecated This config will be removed soon. Please use {@link #MAX_BLOCK_MS_CONFIG} */ + @Deprecated public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions. This " + "fetch to succeed before throwing an exception back to the client."; @@ -197,7 +198,7 @@ public class ProducerConfig extends AbstractConfig { /** request.timeout.ms */ public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; - private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC ; + private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 09e44fb..b98e91f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -124,9 +124,8 @@ public final class BufferPool { // enough memory to allocate one while (accumulated < size) { long startWait = time.nanoseconds(); - if (!moreMemory.await(maxTimeToBlock, TimeUnit.MILLISECONDS)) { - throw new TimeoutException("Failed to allocate memory within the configured max blocking time"); - } + if (!moreMemory.await(maxTimeToBlock, TimeUnit.MILLISECONDS)) + throw new TimeoutException("Failed to allocate memory within the configured max blocking time"); long endWait = time.nanoseconds(); this.waitTime.record(endWait - startWait, time.milliseconds()); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 7d8cab4..a74b62b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -19,7 +19,6 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -166,8 +165,8 @@ public final class RecordAccumulator { RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); - if (future != null) - return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); + if (future != null) + return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } @@ -217,21 +216,20 @@ public final class RecordAccumulator { while (batchIterator.hasNext()) { RecordBatch batch = batchIterator.next(); Node leader = cluster.leaderFor(topicAndPartition); - if (batch != null && (leader == null)) { + if (leader == null) { // check if the batch is expired - if (batch.expire(requestTimeout, now)) { + if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) { expiredBatches.add(batch); count++; - dq.remove(batch); + batchIterator.remove(); deallocate(batch); } } } } } - if (expiredBatches.size() > 0) { + if (expiredBatches.size() > 0) log.trace("Expired {} batches in accumulator", count); - } return expiredBatches; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 640a261..4009eef 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -21,8 +21,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; -import org.apache.kafka.common.utils.SystemTime; -import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,16 +123,11 @@ public final class RecordBatch { /** * Expire the batch that is ready but is sitting in accumulator for more than - * requestTimeout due to metadata being unava - * - * - * - * - * lable. + * requestTimeout due to metadata being unavailable. */ - public boolean expire(int requestTimeout, long now) { + public boolean maybeExpire(int requestTimeout, long now, long lingerMs) { boolean expire = false; - if (requestTimeout < (now - this.lastAppendTime)) { + if ((this.records.isFull() && requestTimeout < (now - this.lastAppendTime)) || requestTimeout < (now - (this.createdMs + lingerMs))) { expire = true; this.records.close(); this.done(-1L, new TimeoutException("Batch Expired")); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 094605f..134d45a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -190,9 +190,8 @@ public class Sender implements Runnable { List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now); // update sensors - for (RecordBatch expiredBatch : expiredBatches) { - this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); - } + for (RecordBatch expiredBatch : expiredBatches) + this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); List requests = createProduceRequests(batches, now); diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index a3fcce2..4d11362 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -45,14 +45,14 @@ import org.junit.Test; public class NetworkClientTest { - private final int REQUEST_TIMEOUT = 1000; + private final int requestTimeoutMs = 1000; private MockTime time = new MockTime(); private MockSelector selector = new MockSelector(time); private Metadata metadata = new Metadata(0, Long.MAX_VALUE); private int nodeId = 1; private Cluster cluster = TestUtils.singletonCluster("test", nodeId); private Node node = cluster.nodes().get(0); - private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, REQUEST_TIMEOUT); + private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs); @Before public void setup() { @@ -133,14 +133,13 @@ public class NetworkClientTest { assertEquals(node.idString(), disconnectedNode); } - private static class TestCallbackHandler implements RequestCompletionHandler { + private static class TestCallbackHandler implements RequestCompletionHandler { public boolean executed = false; public ClientResponse response; - + public void onComplete(ClientResponse response) { this.executed = true; this.response = response; } } - } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index 883b563..ded5d3e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -35,7 +35,7 @@ import static org.junit.Assert.*; public class BufferPoolTest { private MockTime time = new MockTime(); private Metrics metrics = new Metrics(time); - private final long MAX_BLOCK_TIME = 2000; + private final long maxBlockTimeMs = 2000; String metricGroup = "TestMetrics"; Map metricTags = new LinkedHashMap(); @@ -47,7 +47,7 @@ public class BufferPoolTest { long totalMemory = 64 * 1024; int size = 1024; BufferPool pool = new BufferPool(totalMemory, size, metrics, time, metricGroup, metricTags); - ByteBuffer buffer = pool.allocate(size, MAX_BLOCK_TIME); + ByteBuffer buffer = pool.allocate(size, maxBlockTimeMs); assertEquals("Buffer size should equal requested size.", size, buffer.limit()); assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory()); assertEquals("Available memory should have shrunk", totalMemory - size, pool.availableMemory()); @@ -56,13 +56,13 @@ public class BufferPoolTest { pool.deallocate(buffer); assertEquals("All memory should be available", totalMemory, pool.availableMemory()); assertEquals("But now some is on the free list", totalMemory - size, pool.unallocatedMemory()); - buffer = pool.allocate(size, MAX_BLOCK_TIME); + buffer = pool.allocate(size, maxBlockTimeMs); assertEquals("Recycled buffer should be cleared.", 0, buffer.position()); assertEquals("Recycled buffer should be cleared.", buffer.capacity(), buffer.limit()); pool.deallocate(buffer); assertEquals("All memory should be available", totalMemory, pool.availableMemory()); assertEquals("Still a single buffer on the free list", totalMemory - size, pool.unallocatedMemory()); - buffer = pool.allocate(2 * size, MAX_BLOCK_TIME); + buffer = pool.allocate(2 * size, maxBlockTimeMs); pool.deallocate(buffer); assertEquals("All memory should be available", totalMemory, pool.availableMemory()); assertEquals("Non-standard size didn't go to the free list.", totalMemory - size, pool.unallocatedMemory()); @@ -74,10 +74,10 @@ public class BufferPoolTest { @Test(expected = IllegalArgumentException.class) public void testCantAllocateMoreMemoryThanWeHave() throws Exception { BufferPool pool = new BufferPool(1024, 512, metrics, time, metricGroup, metricTags); - ByteBuffer buffer = pool.allocate(1024, MAX_BLOCK_TIME); + ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs); assertEquals(1024, buffer.limit()); pool.deallocate(buffer); - buffer = pool.allocate(1025, MAX_BLOCK_TIME); + buffer = pool.allocate(1025, maxBlockTimeMs); } /** @@ -86,7 +86,7 @@ public class BufferPoolTest { @Test public void testDelayedAllocation() throws Exception { BufferPool pool = new BufferPool(5 * 1024, 1024, metrics, time, metricGroup, metricTags); - ByteBuffer buffer = pool.allocate(1024, MAX_BLOCK_TIME); + ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs); CountDownLatch doDealloc = asyncDeallocate(pool, buffer); CountDownLatch allocation = asyncAllocate(pool, 5 * 1024); assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, allocation.getCount()); @@ -115,7 +115,7 @@ public class BufferPoolTest { Thread thread = new Thread() { public void run() { try { - pool.allocate(size, MAX_BLOCK_TIME); + pool.allocate(size, maxBlockTimeMs); } catch (InterruptedException e) { e.printStackTrace(); } finally { @@ -135,9 +135,9 @@ public class BufferPoolTest { @Test public void testBlockTimeout() throws Exception { BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup, metricTags); - pool.allocate(1, MAX_BLOCK_TIME); + pool.allocate(1, maxBlockTimeMs); try { - pool.allocate(2, MAX_BLOCK_TIME); + pool.allocate(2, maxBlockTimeMs); fail("The buffer allocated more memory than its maximum value 2"); } catch (TimeoutException e) { // this is good @@ -169,7 +169,7 @@ public class BufferPoolTest { public static class StressTestThread extends Thread { private final int iterations; private final BufferPool pool; - private final long MAX_BLOCK_TIME_MS = 2000; + private final long maxBlockTimeMs = 2000; public final AtomicBoolean success = new AtomicBoolean(false); public StressTestThread(BufferPool pool, int iterations) { @@ -187,7 +187,7 @@ public class BufferPoolTest { else // allocate a random size size = TestUtils.RANDOM.nextInt((int) pool.totalMemory()); - ByteBuffer buffer = pool.allocate(size, MAX_BLOCK_TIME_MS); + ByteBuffer buffer = pool.allocate(size, maxBlockTimeMs); pool.deallocate(buffer); } success.set(true); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index e24f972..1790182 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -66,7 +65,7 @@ public class RecordAccumulatorTest { private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); private Metrics metrics = new Metrics(time); Map metricTags = new LinkedHashMap(); - private final long MAX_BLOCK_TIME = 1000; + private final long maxBlockTimeMs = 1000; @Test public void testFull() throws Exception { @@ -74,10 +73,10 @@ public class RecordAccumulatorTest { RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time, metricTags); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp1, key, value, null, MAX_BLOCK_TIME); + accum.append(tp1, key, value, null, maxBlockTimeMs); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } - accum.append(tp1, key, value, null, MAX_BLOCK_TIME); + accum.append(tp1, key, value, null, maxBlockTimeMs); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); @@ -96,7 +95,7 @@ public class RecordAccumulatorTest { public void testAppendLarge() throws Exception { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time, metricTags); - accum.append(tp1, key, new byte[2 * batchSize], null, MAX_BLOCK_TIME); + accum.append(tp1, key, new byte[2 * batchSize], null, maxBlockTimeMs); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @@ -104,7 +103,7 @@ public class RecordAccumulatorTest { public void testLinger() throws Exception { long lingerMs = 10L; RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags); - accum.append(tp1, key, value, null, MAX_BLOCK_TIME); + accum.append(tp1, key, value, null, maxBlockTimeMs); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -126,7 +125,7 @@ public class RecordAccumulatorTest { List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) - accum.append(tp, key, value, null, MAX_BLOCK_TIME); + accum.append(tp, key, value, null, maxBlockTimeMs); } assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -147,7 +146,7 @@ public class RecordAccumulatorTest { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition(topic, i % numParts), key, value, null, MAX_BLOCK_TIME); + accum.append(new TopicPartition(topic, i % numParts), key, value, null, maxBlockTimeMs); } catch (Exception e) { e.printStackTrace(); } @@ -187,7 +186,7 @@ public class RecordAccumulatorTest { // Partition on node1 only for (int i = 0; i < appends; i++) - accum.append(tp1, key, value, null, MAX_BLOCK_TIME); + accum.append(tp1, key, value, null, maxBlockTimeMs); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); @@ -196,14 +195,14 @@ public class RecordAccumulatorTest { // Add partition on node2 only for (int i = 0; i < appends; i++) - accum.append(tp3, key, value, null, MAX_BLOCK_TIME); + accum.append(tp3, key, value, null, maxBlockTimeMs); result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) - accum.append(tp2, key, value, null, MAX_BLOCK_TIME); + accum.append(tp2, key, value, null, maxBlockTimeMs); result = accum.ready(cluster, time.milliseconds()); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); // Note this can actually be < linger time because it may use delays from partitions that aren't sendable @@ -218,7 +217,7 @@ public class RecordAccumulatorTest { final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, metricTags); long now = time.milliseconds(); - accum.append(tp1, key, value, null, MAX_BLOCK_TIME); + accum.append(tp1, key, value, null, maxBlockTimeMs); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); Map> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); @@ -230,7 +229,7 @@ public class RecordAccumulatorTest { accum.reenqueue(batches.get(0).get(0), now); // Put message for partition 1 into accumulator - accum.append(tp2, key, value, null, MAX_BLOCK_TIME); + accum.append(tp2, key, value, null, maxBlockTimeMs); result = accum.ready(cluster, now + lingerMs + 1); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); @@ -254,7 +253,7 @@ public class RecordAccumulatorTest { long lingerMs = Long.MAX_VALUE; final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags); for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, null, MAX_BLOCK_TIME); + accum.append(new TopicPartition(topic, i % 3), key, value, null, maxBlockTimeMs); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); @@ -285,7 +284,7 @@ public class RecordAccumulatorTest { } } for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback(), MAX_BLOCK_TIME); + accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback(), maxBlockTimeMs); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); @@ -299,19 +298,20 @@ public class RecordAccumulatorTest { public void testExpiredBatches() throws InterruptedException { Time time = new SystemTime(); long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time, metricTags); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp1, key, value, null, MAX_BLOCK_TIME); + accum.append(tp1, key, value, null, maxBlockTimeMs); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } time.sleep(2000); + accum.ready(cluster, now); accum.append(tp1, key, value, null, 0); Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); - Cluster cluster = new Cluster(Arrays.asList(), Arrays.asList()); + Cluster cluster = new Cluster(new ArrayList(), new ArrayList()); now = time.milliseconds(); - List batches = accum.abortExpiredBatches(0, cluster, now); + List batches = accum.abortExpiredBatches(60, cluster, now); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); batch.records.close(); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 3a684d9..ff7208f 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -92,7 +92,11 @@ public class SelectorTest { String node = "0"; blockingConnect(node); selector.disconnect(node); - selector.send(createSend(node, "hello1")); + try { + selector.send(createSend(node, "hello1")); + } catch (IllegalStateException e) { + // Attempted to write to the socket for which there was no open connection + } selector.poll(10); assertEquals("Request should not have succeeded", 0, selector.completedSends().size()); assertEquals("There should be a disconnect", 1, selector.disconnected().size()); diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index 0335cc6..55093c0 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -184,7 +184,6 @@ object ProducerPerformance extends Logging { props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance") props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 1198df0..e90818a 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -144,49 +144,6 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { } /** - * 1. With ack=0, the future metadata should not be blocked. - * 2. With ack=1, the future metadata should block, - * and subsequent calls will eventually cause buffer full - */ - @Test - def testNoResponse() { - // create topic - TestUtils.createTopic(zkClient, topic1, 1, numServers, servers) - - // first send a message to make sure the metadata is refreshed - val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes) - producer1.send(record1).get - producer2.send(record1).get - - // stop IO threads and request handling, but leave networking operational - // any requests should be accepted and queue up, but not handled - servers.foreach(server => server.requestHandlerPool.shutdown()) - - producer1.send(record1).get(5000, TimeUnit.MILLISECONDS) - - intercept[TimeoutException] { - producer2.send(record1).get(5000, TimeUnit.MILLISECONDS) - } - - // TODO: expose producer configs after creating them - // send enough messages to get buffer full - val tooManyRecords = 10 - val msgSize = producerBufferSize / tooManyRecords - val value = new Array[Byte](msgSize) - new Random().nextBytes(value) - val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, value) - - intercept[KafkaException] { - for (i <- 1 to tooManyRecords) - producer2.send(record2) - } - - // do not close produce2 since it will block - // TODO: can we do better? - producer2 = null - } - - /** * The send call with invalid partition id should throw KafkaException caused by IllegalArgumentException */ @Test @@ -287,7 +244,8 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { } catch { case e: ExecutionException => if (!e.getCause.isInstanceOf[NotEnoughReplicasException] && - !e.getCause.isInstanceOf[NotEnoughReplicasAfterAppendException]) { + !e.getCause.isInstanceOf[NotEnoughReplicasAfterAppendException] && + !e.getCause.isInstanceOf[TimeoutException]) { fail("Expected NotEnoughReplicasException or NotEnoughReplicasAfterAppendException when producing to topic " + "with fewer brokers than min.insync.replicas, but saw " + e.getCause) } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 09b8444..ff85bf5 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -410,7 +410,6 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString) - producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString) producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") -- 1.7.12.4 From 6ddd867c03e6fbe4a5d4743858be1966bd075c30 Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Wed, 12 Aug 2015 10:57:07 -0700 Subject: [PATCH 5/7] Addressed Jason's comments about the default values for requestTimeout --- .../src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java | 2 +- .../src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index a50f056..dad0bc0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -304,7 +304,7 @@ public class ConsumerConfig extends AbstractConfig { .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false) .define(REQUEST_TIMEOUT_MS_CONFIG, Type.INT, - 10 * 1000, + 40 * 1000, atLeast(0), Importance.MEDIUM, REQUEST_TIMEOUT_MS_DOC) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index b961290..27c134b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -241,7 +241,7 @@ public class ProducerConfig extends AbstractConfig { MAX_BLOCK_MS_DOC) .define(REQUEST_TIMEOUT_MS_CONFIG, Type.INT, - 10 * 1000, + 30 * 1000, atLeast(0), Importance.MEDIUM, REQUEST_TIMEOUT_MS_DOC) -- 1.7.12.4 From 5abaff4c4bbc66392998dcaf9a90edc00f9451a0 Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Mon, 31 Aug 2015 18:43:49 -0700 Subject: [PATCH 6/7] checkpoint --- .../org/apache/kafka/clients/InFlightRequests.java | 20 +++++++++--------- .../kafka/clients/producer/KafkaProducer.java | 24 +++++++++++++++++----- .../kafka/clients/producer/ProducerConfig.java | 15 ++++++++------ .../clients/producer/internals/BufferPool.java | 1 + .../producer/internals/RecordAccumulator.java | 2 ++ .../clients/producer/internals/RecordBatch.java | 4 ++-- .../kafka/common/network/SSLSelectorTest.java | 6 +++++- .../scala/kafka/tools/ProducerPerformance.scala | 3 +-- 8 files changed, 49 insertions(+), 26 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index c919df5..6e918fb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -127,25 +127,25 @@ final class InFlightRequests { } /** - * Returns a list of nodes, that have pending inflight request, that can be timed out + * Returns a list of nodes with pending inflight request, that need to be timed out * - * @param now current time in milliseconds + * @param now current time in milliseconds * @param requestTimeout max time to wait for the request to be completed * @return list of nodes */ public Iterable getNodesWithTimedOutRequests(long now, int requestTimeout) { List nodeIds = new LinkedList(); - if (!requests.isEmpty()) { - for (String nodeId : requests.keySet()) { - if (inFlightRequestCount(nodeId) > 0) { - ClientRequest request = lastSent(nodeId); - long timeSinceSend = now - request.getSendMs(); - if (timeSinceSend > requestTimeout) { - nodeIds.add(nodeId); - } +// if (!requests.isEmpty()) { + for (String nodeId : requests.keySet()) { + if (inFlightRequestCount(nodeId) > 0) { + ClientRequest request = lastSent(nodeId); + long timeSinceSend = now - request.getSendMs(); + if (timeSinceSend > requestTimeout) { + nodeIds.add(nodeId); } } } +// } return nodeIds; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index e141bcd..b68bd41 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -416,6 +416,7 @@ public class KafkaProducer implements Producer { " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer"); } + long remainingTime = getRemainingTime(startTime); byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.value()); @@ -424,17 +425,20 @@ public class KafkaProducer implements Producer { " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer"); } + remainingTime = getRemainingTime(startTime); int partition = partition(record, serializedKey, serializedValue, metadata.fetch()); + remainingTime = getRemainingTime(startTime); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); // check remaining blocking time - long elapsedTime = time.milliseconds() - startTime; - if (elapsedTime > maxBlockTimeMs) { - throw new TimeoutException("Request timed out"); - } - long remainingTime = maxBlockTimeMs - elapsedTime; +// long elapsedTime = time.milliseconds() - startTime; +// if (elapsedTime > maxBlockTimeMs) { +// throw new TimeoutException("Request timed out"); +// } +// long remainingTime = maxBlockTimeMs - elapsedTime; + remainingTime = getRemainingTime(startTime); RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingTime); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); @@ -678,6 +682,16 @@ public class KafkaProducer implements Producer { cluster); } + private long getRemainingTime(long startTime) { + long elapsedTime = time.milliseconds() - startTime; + if (elapsedTime > maxBlockTimeMs) { + throw new TimeoutException("Request timed out"); + } + long remainingTime = maxBlockTimeMs - elapsedTime; + + return remainingTime; + } + private static class FutureFailure implements Future { private final ExecutionException exception; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 27c134b..93c1de8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -97,8 +97,9 @@ public class ProducerConfig extends AbstractConfig { + " remains alive. This is the strongest available guarantee."; /** timeout.ms */ + /** - *@deprecated This config will be removed soon. Please use {@link #REQUEST_TIMEOUT_MS_CONFIG} + * @deprecated This config will be removed soon. Please use {@link #REQUEST_TIMEOUT_MS_CONFIG} */ @Deprecated public static final String TIMEOUT_CONFIG = "timeout.ms"; @@ -137,8 +138,8 @@ public class ProducerConfig extends AbstractConfig { /** block.on.buffer.full */ /** - *@deprecated This config will be removed soon. Currently, if this property is set to true, the METADATA_FETCH_TIMEOUT_CONFIG will not be honored. - *Please use {@link #MAX_BLOCK_MS_CONFIG}. + * @deprecated This config will be removed soon. Currently, if this property is set to true, the METADATA_FETCH_TIMEOUT_CONFIG will not be honored. + * Please use {@link #MAX_BLOCK_MS_CONFIG}. */ @Deprecated public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full"; @@ -192,9 +193,11 @@ public class ProducerConfig extends AbstractConfig { /** max.block.ms */ public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms"; - private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long {@link KafkaProducer#send()} will block." - + "The send method can be blocked for multiple reasons. " - + "For e.g: buffer full, metadata unavailable etc."; + private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long {@link KafkaProducer#send()} and {@link KafkaProducer#partitionsFor} will block." + + "These methods can be blocked for multiple reasons. For e.g: buffer full, metadata unavailable." + + "This configuration imposes maximum limit on the total time spent in fetching metadata, serialization of key and value, partitioning and " + + "allocation of buffer memory when doing a send(). In case of partitionsFor(), this configuration imposes a maximum time threshold on waiting " + + "for metadata"; /** request.timeout.ms */ public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index b98e91f..2a45075 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -86,6 +86,7 @@ public final class BufferPool { * is configured with blocking mode. * * @param size The buffer size to allocate in bytes + * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available * @return The buffer * @throws InterruptedException If the thread is interrupted while blocked * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index a74b62b..10cd6cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -151,6 +151,7 @@ public final class RecordAccumulator { * @param key The key for the record * @param value The value for the record * @param callback The user-supplied callback to execute when the request is complete + * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available */ public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in @@ -240,6 +241,7 @@ public final class RecordAccumulator { public void reenqueue(RecordBatch batch, long now) { batch.attempts++; batch.lastAttemptMs = now; + batch.lastAppendTime = now; Deque deque = dequeFor(batch.topicPartition); synchronized (deque) { deque.addFirst(batch); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 4009eef..2dee36e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -42,8 +42,8 @@ public final class RecordBatch { public final MemoryRecords records; public final TopicPartition topicPartition; public final ProduceRequestResult produceFuture; + public long lastAppendTime; private final List thunks; - private long lastAppendTime; public RecordBatch(TopicPartition tp, MemoryRecords records, long now) { this.createdMs = now; @@ -127,7 +127,7 @@ public final class RecordBatch { */ public boolean maybeExpire(int requestTimeout, long now, long lingerMs) { boolean expire = false; - if ((this.records.isFull() && requestTimeout < (now - this.lastAppendTime)) || requestTimeout < (now - (this.createdMs + lingerMs))) { + if ((this.records.isFull() && requestTimeout < (now - this.lastAppendTime)) || requestTimeout < (now - (this.lastAttemptMs + lingerMs))) { expire = true; this.records.close(); this.done(-1L, new TimeoutException("Batch Expired")); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java index df1205c..0074152 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java @@ -110,7 +110,11 @@ public class SSLSelectorTest { String node = "0"; blockingConnect(node); selector.disconnect(node); - selector.send(createSend(node, "hello1")); + try { + selector.send(createSend(node, "hello1")); + } catch (IllegalStateException e) { + // Attempted to write to the socket for which there was no open connection + } selector.poll(10L); assertEquals("Request should not have succeeded", 0, selector.completedSends().size()); assertEquals("There should be a disconnect", 1, selector.disconnected().size()); diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index 55093c0..e55474c 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -37,7 +37,6 @@ import org.apache.log4j.Logger object ProducerPerformance extends Logging { def main(args: Array[String]) { - val logger = Logger.getLogger(getClass) val config = new ProducerPerfConfig(args) if (!config.isFixedSize) @@ -119,7 +118,7 @@ object ProducerPerformance extends Logging { .withRequiredArg .describedAs("metrics directory") .ofType(classOf[java.lang.String]) - val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") + val useNewProducerOpt = parser.accepts("new-producer2", "Use the new producer implementation.") val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt) -- 1.7.12.4 From 99ddfbcce7618e3b8b6aaf129b68e934158eb5ac Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Thu, 3 Sep 2015 15:09:19 -0700 Subject: [PATCH 7/7] Addressed Joel's concerns. Also tried to include Jun's feedback. --- .../main/java/org/apache/kafka/clients/InFlightRequests.java | 2 -- .../java/org/apache/kafka/clients/producer/KafkaProducer.java | 2 +- .../main/java/org/apache/kafka/common/network/Selector.java | 10 ++++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index 6e918fb..1599f23 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -135,7 +135,6 @@ final class InFlightRequests { */ public Iterable getNodesWithTimedOutRequests(long now, int requestTimeout) { List nodeIds = new LinkedList(); -// if (!requests.isEmpty()) { for (String nodeId : requests.keySet()) { if (inFlightRequestCount(nodeId) > 0) { ClientRequest request = lastSent(nodeId); @@ -145,7 +144,6 @@ final class InFlightRequests { } } } -// } return nodeIds; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b68bd41..7bb12a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -220,7 +220,7 @@ public class KafkaProducer implements Producer { this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); /* check for user defined settings. * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG. - * This should be removed with release 0.9 + * This should be removed with release 0.9 when the deprecated configs are removed. */ if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) { log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " + diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index eec738a..d60adda 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -179,8 +179,7 @@ public class Selector implements Selectable { } /** - * Disconnect any connections for the given id (if there are any). The disconnection is asynchronous and will not be - * processed until the next {@link #poll(long) poll()} call. + * Disconnect any connections for the given id (if there are any). */ @Override public void disconnect(String id) { @@ -355,6 +354,11 @@ public class Selector implements Selectable { @Override public List disconnected() { + if (this.clientDisconnects.size() > 0) { + this.disconnected.addAll(this.clientDisconnects); + this.clientDisconnects.clear(); + } + return this.disconnected; } @@ -425,9 +429,7 @@ public class Selector implements Selectable { this.connected.clear(); this.disconnected.clear(); this.disconnected.addAll(this.failedSends); - this.disconnected.addAll(this.clientDisconnects); this.failedSends.clear(); - this.clientDisconnects.clear(); } /** -- 1.7.12.4