From 60283e7764086cd76ed36002869a246c70bdb9eb Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Thu, 23 Jul 2015 21:23:08 -0700 Subject: [PATCH] Addressed Joel', Adi's, Dong's, Becket's comments --- .../org/apache/kafka/clients/ClientRequest.java | 8 +++ .../apache/kafka/clients/CommonClientConfigs.java | 6 ++ .../org/apache/kafka/clients/InFlightRequests.java | 32 ++++++++- .../java/org/apache/kafka/clients/KafkaClient.java | 2 +- .../org/apache/kafka/clients/NetworkClient.java | 27 ++++++-- .../kafka/clients/consumer/ConsumerConfig.java | 10 +++ .../kafka/clients/consumer/KafkaConsumer.java | 3 +- .../kafka/clients/producer/KafkaProducer.java | 65 +++++++++++++++--- .../kafka/clients/producer/ProducerConfig.java | 36 +++++++++- .../clients/producer/internals/BufferPool.java | 20 ++---- .../producer/internals/RecordAccumulator.java | 56 ++++++++++++---- .../clients/producer/internals/RecordBatch.java | 28 +++++++- .../kafka/clients/producer/internals/Sender.java | 21 ++++-- .../org/apache/kafka/common/network/Selector.java | 9 ++- .../java/org/apache/kafka/clients/MockClient.java | 2 +- .../apache/kafka/clients/NetworkClientTest.java | 31 +++++++-- .../clients/producer/internals/BufferPoolTest.java | 57 +++++++++------- .../producer/internals/RecordAccumulatorTest.java | 77 +++++++++++++++------- .../clients/producer/internals/SenderTest.java | 20 +++--- 19 files changed, 394 insertions(+), 116 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java index ed4c0d9..956aedb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java @@ -23,6 +23,7 @@ public final class ClientRequest { private final boolean expectResponse; private final RequestSend request; private final RequestCompletionHandler callback; + private long sendMs; /** * @param createdMs The unix timestamp in milliseconds for the time at which this request was created. @@ -63,4 +64,11 @@ public final class ClientRequest { return createdMs; } + public long getSendMs() { + return sendMs; + } + + public void setSendMs(long sendMs) { + this.sendMs = sendMs; + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 2c421f4..855b9ab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -57,4 +57,10 @@ public class CommonClientConfigs { public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms"; public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config."; + + public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms"; + public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the client will wait " + + "for the response of a request. If the response is not received before the timeout " + + "elapses the client will resend the request if necessary or fail the request if " + + "retries are exhausted."; } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index 15d00d4..64f7bf3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -16,7 +16,13 @@ import java.util.ArrayDeque; import java.util.Collections; import java.util.Deque; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; + /** * The set of requests which have been sent or are being sent but haven't yet received a response @@ -33,12 +39,13 @@ final class InFlightRequests { /** * Add the given request to the queue for the connection it was directed to */ - public void add(ClientRequest request) { + public void add(ClientRequest request, long now) { Deque reqs = this.requests.get(request.request().destination()); if (reqs == null) { reqs = new ArrayDeque(); this.requests.put(request.request().destination(), reqs); } + request.setSendMs(now); reqs.addFirst(request); } @@ -123,4 +130,27 @@ final class InFlightRequests { } } + /** + * Returns a list of nodes, that have pending inflight request, that can be timed out + * + * @param now current time in milliseconds + * @param requestTimeout max time to wait for the request to be completed + * @return list of nodes + */ + public Iterable getNodesWithTimedOutRequests(long now, int requestTimeout) { + List nodeIds = new LinkedList(); + if (requests.size() > 0) { + for (String nodeId : requests.keySet()) { + if (inFlightRequestCount(nodeId) > 0) { + ClientRequest request = lastSent(nodeId); + long timeSinceSend = now - request.getSendMs(); + if (timeSinceSend > requestTimeout) { + nodeIds.add(nodeId); + } + } + } + } + + return nodeIds; + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 7ab2503..a07e84e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -69,7 +69,7 @@ public interface KafkaClient extends Closeable { * * @param request The request */ - public void send(ClientRequest request); + public void send(ClientRequest request, long now); /** * Do actual reads and writes from sockets. diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 48fe796..82f5611 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -79,13 +79,17 @@ public class NetworkClient implements KafkaClient { /* the last timestamp when no broker node is available to connect */ private long lastNoNodeAvailableMs; + /* max time in ms for the producer to wait for acknowledgement from server*/ + private final int requestTimeout; + public NetworkClient(Selectable selector, Metadata metadata, String clientId, int maxInFlightRequestsPerConnection, long reconnectBackoffMs, int socketSendBuffer, - int socketReceiveBuffer) { + int socketReceiveBuffer, + int requestTimeoutMs) { this.selector = selector; this.metadata = metadata; this.clientId = clientId; @@ -97,6 +101,7 @@ public class NetworkClient implements KafkaClient { this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE); this.metadataFetchInProgress = false; this.lastNoNodeAvailableMs = 0; + this.requestTimeout = requestTimeoutMs; } /** @@ -189,12 +194,11 @@ public class NetworkClient implements KafkaClient { * @param request The request */ @Override - public void send(ClientRequest request) { + public void send(ClientRequest request, long now) { String nodeId = request.request().destination(); if (!isSendable(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); - - this.inFlightRequests.add(request); + this.inFlightRequests.add(request, now); selector.send(request.request()); } @@ -227,6 +231,7 @@ public class NetworkClient implements KafkaClient { List responses = new ArrayList(); handleCompletedSends(responses, now); handleCompletedReceives(responses, now); + handleTimedOutRequests(now); handleDisconnections(responses, now); handleConnections(); @@ -348,6 +353,18 @@ public class NetworkClient implements KafkaClient { } /** + * Iterate over all the inflight requests and expire any requests that have exceeded the configured the requestTimeout. + * The connection to the node associated with the request will be terminated and will be treated as a disconnection. It + * will be handled in handleDisconnections() subsequently. + */ + private void handleTimedOutRequests(long now) { + for (String nodeId : this.inFlightRequests.getNodesWithTimedOutRequests(/*currTime,*/now, this.requestTimeout)) { + // disconnect the connection to the node + this.selector.disconnect(nodeId); + } + } + + /** * Handle any completed request send. In particular if no response is expected consider the request complete. * * @param responses The list of responses to update @@ -479,7 +496,7 @@ public class NetworkClient implements KafkaClient { ClientRequest metadataRequest = metadataRequest(now, nodeConnectionId, topics); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); this.selector.send(metadataRequest.request()); - this.inFlightRequests.add(metadataRequest); + this.inFlightRequests.add(metadataRequest, now); } else if (connectionStates.canConnect(nodeConnectionId, now)) { // we don't have a connection to this node right now, make one log.debug("Initialize connection to node {} for sending metadata request", node.id()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 70377ae..f582144 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -159,6 +159,10 @@ public class ConsumerConfig extends AbstractConfig { /** connections.max.idle.ms */ public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; + /** request.timeout.ms */ + public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; + private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC ; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, @@ -282,6 +286,12 @@ public class ConsumerConfig extends AbstractConfig { Type.CLASS, Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC) + .define(REQUEST_TIMEOUT_MS_CONFIG, + Type.INT, + 10 * 1000, + atLeast(0), + Importance.MEDIUM, + REQUEST_TIMEOUT_MS_DOC) /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index bea3d73..761406e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -534,7 +534,8 @@ public class KafkaConsumer implements Consumer { 100, // a fixed large enough value will suffice config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), - config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG)); + config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG), + config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs); OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase()); this.subscriptions = new SubscriptionState(offsetResetStrategy); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 03b8dd2..e167b95 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -124,7 +124,6 @@ public class KafkaProducer implements Producer { private final Partitioner partitioner; private final int maxRequestSize; - private final long metadataFetchTimeoutMs; private final long totalMemorySize; private final Metadata metadata; private final RecordAccumulator accumulator; @@ -137,6 +136,8 @@ public class KafkaProducer implements Producer { private final Serializer keySerializer; private final Serializer valueSerializer; private final ProducerConfig producerConfig; + private final long maxBlockTimeMs; + private final int requestTimeoutMs; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -193,8 +194,10 @@ public class KafkaProducer implements Producer { private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer) { try { log.trace("Starting the Kafka producer"); + Map userProvidedConfigs = config.originals(); this.producerConfig = config; this.time = new SystemTime(); + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); @@ -208,11 +211,45 @@ public class KafkaProducer implements Producer { this.metrics = new Metrics(metricConfig, reporters, time); this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); - this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + /* check for user defined settings. + * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG. + * This should be removed with release 0.9 + */ + if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) { + log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " + + "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); + boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG); + if (blockOnBufferFull) { + this.maxBlockTimeMs = Long.MAX_VALUE; + } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { + log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + + "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); + this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); + } else { + this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); + } + } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { + log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + + "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); + this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); + } else { + this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); + } + + /* check for user defined settings. + * If the TIME_OUT config is set use that for request timeout. + * This should be removed with release 0.9 + */ + if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) { + this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG); + } else { + this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); + } + Map metricTags = new LinkedHashMap(); metricTags.put("client-id", clientId); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), @@ -220,7 +257,6 @@ public class KafkaProducer implements Producer { this.compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, - config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, time, metricTags); @@ -234,17 +270,18 @@ public class KafkaProducer implements Producer { config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), - config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG)); + config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), + this.requestTimeoutMs); this.sender = new Sender(client, this.metadata, this.accumulator, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), - config.getInt(ProducerConfig.TIMEOUT_CONFIG), this.metrics, new SystemTime(), - clientId); + clientId, + this.requestTimeoutMs); String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start(); @@ -363,7 +400,8 @@ public class KafkaProducer implements Producer { public Future send(ProducerRecord record, Callback callback) { try { // first make sure the metadata for the topic is available - waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); + long startTime = time.milliseconds(); + waitOnMetadata(record.topic(), this.maxBlockTimeMs); byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.key()); @@ -385,7 +423,13 @@ public class KafkaProducer implements Producer { ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback); + // check remaining blocking time + long elapsedTime = time.milliseconds() - startTime; + if (elapsedTime > maxBlockTimeMs) { + throw new TimeoutException("Request timed out"); + } + long remainingTime = maxBlockTimeMs - elapsedTime; + RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingTime); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); @@ -504,7 +548,7 @@ public class KafkaProducer implements Producer { @Override public List partitionsFor(String topic) { try { - waitOnMetadata(topic, this.metadataFetchTimeoutMs); + waitOnMetadata(topic, this.maxBlockTimeMs); } catch (InterruptedException e) { throw new InterruptException(e); } @@ -623,7 +667,8 @@ public class KafkaProducer implements Producer { + "]."); return partition; } - return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); + return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, + cluster); } private static class FutureFailure implements Future { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index aa26420..5f27889 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -44,6 +44,9 @@ public class ProducerConfig extends AbstractConfig { public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; /** metadata.fetch.timeout.ms */ + /** + * @deprecated This config will be removed soon. Please use {@link #MAX_BLOCK_MS_CONFIG} + */ public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions. This " + "fetch to succeed before throwing an exception back to the client."; @@ -92,6 +95,10 @@ public class ProducerConfig extends AbstractConfig { + " remains alive. This is the strongest available guarantee."; /** timeout.ms */ + /** + *@deprecated This config will be removed soon. Please use {@link #REQUEST_TIMEOUT_MS_CONFIG} + */ + @Deprecated public static final String TIMEOUT_CONFIG = "timeout.ms"; private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to " + "meet the acknowledgment requirements the producer has specified with the acks configuration. If the " + "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout " @@ -127,6 +134,11 @@ public class ProducerConfig extends AbstractConfig { public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG; /** block.on.buffer.full */ + /** + *@deprecated This config will be removed soon. Currently, if this property is set to true, the METADATA_FETCH_TIMEOUT_CONFIG will not be honored. + *Please use {@link #MAX_BLOCK_MS_CONFIG}. + */ + @Deprecated public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full"; private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default " + "this setting is true and we block, however in some scenarios blocking is not desirable and it is better to " + "immediately give an error. Setting this to false will accomplish that: the producer will throw a BufferExhaustedException if a recrord is sent and the buffer space is full."; @@ -176,6 +188,16 @@ public class ProducerConfig extends AbstractConfig { public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the Partitioner interface."; + /** max.block.ms */ + public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms"; + private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long {@link KafkaProducer#send()} will block." + + "The send method can be blocked for multiple reasons. " + + "For e.g: buffer full, metadata unavailable etc."; + + /** request.timeout.ms */ + public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; + private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC ; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) @@ -199,7 +221,7 @@ public class ProducerConfig extends AbstractConfig { atLeast(0), Importance.MEDIUM, MAX_REQUEST_SIZE_DOC) - .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) + .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, false, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC) @@ -209,6 +231,18 @@ public class ProducerConfig extends AbstractConfig { atLeast(0), Importance.LOW, METADATA_FETCH_TIMEOUT_DOC) + .define(MAX_BLOCK_MS_CONFIG, + Type.LONG, + 60 * 1000, + atLeast(0), + Importance.MEDIUM, + MAX_BLOCK_MS_DOC) + .define(REQUEST_TIMEOUT_MS_CONFIG, + Type.INT, + 10 * 1000, + atLeast(0), + Importance.MEDIUM, + REQUEST_TIMEOUT_MS_DOC) .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC) .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 4cb1e50..09e44fb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -24,8 +24,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Rate; @@ -46,7 +46,6 @@ public final class BufferPool { private final long totalMemory; private final int poolableSize; - private final boolean blockOnExhaustion; private final ReentrantLock lock; private final Deque free; private final Deque waiters; @@ -60,17 +59,13 @@ public final class BufferPool { * * @param memory The maximum amount of memory that this buffer pool can allocate * @param poolableSize The buffer size to cache in the free list rather than deallocating - * @param blockOnExhaustion This controls the behavior when the buffer pool is out of memory. If true the - * {@link #allocate(int)} call will block and wait for memory to be returned to the pool. If false - * {@link #allocate(int)} will throw an exception if the buffer is out of memory. * @param metrics instance of Metrics * @param time time instance * @param metricGrpName logical group name for metrics * @param metricTags additional key/val attributes for metrics */ - public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion, Metrics metrics, Time time , String metricGrpName , Map metricTags) { + public BufferPool(long memory, int poolableSize, Metrics metrics, Time time , String metricGrpName , Map metricTags) { this.poolableSize = poolableSize; - this.blockOnExhaustion = blockOnExhaustion; this.lock = new ReentrantLock(); this.free = new ArrayDeque(); this.waiters = new ArrayDeque(); @@ -95,9 +90,8 @@ public final class BufferPool { * @throws InterruptedException If the thread is interrupted while blocked * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block * forever) - * @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool */ - public ByteBuffer allocate(int size) throws InterruptedException { + public ByteBuffer allocate(int size, long maxTimeToBlock) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " @@ -120,10 +114,6 @@ public final class BufferPool { this.availableMemory -= size; lock.unlock(); return ByteBuffer.allocate(size); - } else if (!blockOnExhaustion) { - throw new BufferExhaustedException("You have exhausted the " + this.totalMemory - + " bytes of memory you configured for the client and the client is configured to error" - + " rather than block when memory is exhausted."); } else { // we are out of memory and will have to block int accumulated = 0; @@ -134,7 +124,9 @@ public final class BufferPool { // enough memory to allocate one while (accumulated < size) { long startWait = time.nanoseconds(); - moreMemory.await(); + if (!moreMemory.await(maxTimeToBlock, TimeUnit.MILLISECONDS)) { + throw new TimeoutException("Failed to allocate memory within the configured max blocking time"); + } long endWait = time.nanoseconds(); this.waitTime.record(endWait - startWait, time.milliseconds()); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index a152bd7..7d8cab4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -12,12 +12,14 @@ */ package org.apache.kafka.clients.producer.internals; +import java.util.Iterator; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -82,8 +84,6 @@ public final class RecordAccumulator { * latency for potentially better throughput due to more batching (and hence fewer, larger requests). * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids * exhausting all retries in a short period of time. - * @param blockOnBufferFull If true block when we are out of memory; if false throw an exception when we are out of - * memory * @param metrics The metrics * @param time The time instance to use * @param metricTags additional key/value attributes of the metric @@ -93,7 +93,6 @@ public final class RecordAccumulator { CompressionType compression, long lingerMs, long retryBackoffMs, - boolean blockOnBufferFull, Metrics metrics, Time time, Map metricTags) { @@ -107,7 +106,7 @@ public final class RecordAccumulator { this.retryBackoffMs = retryBackoffMs; this.batches = new CopyOnWriteMap>(); String metricGrpName = "producer-metrics"; - this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags); + this.free = new BufferPool(totalSize, batchSize, metrics, time , metricGrpName , metricTags); this.incomplete = new IncompleteRecordBatches(); this.time = time; registerMetrics(metrics, metricGrpName, metricTags); @@ -154,7 +153,7 @@ public final class RecordAccumulator { * @param value The value for the record * @param callback The user-supplied callback to execute when the request is complete */ - public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { + public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); @@ -166,23 +165,23 @@ public final class RecordAccumulator { synchronized (dq) { RecordBatch last = dq.peekLast(); if (last != null) { - FutureRecordMetadata future = last.tryAppend(key, value, callback); - if (future != null) - return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); + FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); + if (future != null) + return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } // we don't have an in-progress record batch try to allocate a new batch int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); - ByteBuffer buffer = free.allocate(size); + ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordBatch last = dq.peekLast(); if (last != null) { - FutureRecordMetadata future = last.tryAppend(key, value, callback); + FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); @@ -191,7 +190,7 @@ public final class RecordAccumulator { } MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); - FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback)); + FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); @@ -203,6 +202,41 @@ public final class RecordAccumulator { } /** + * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout + * due to metadata being unavailable + */ + public List abortExpiredBatches(int requestTimeout, Cluster cluster, long now) { + List expiredBatches = new ArrayList(); + int count = 0; + for (Map.Entry> entry : this.batches.entrySet()) { + TopicPartition topicAndPartition = entry.getKey(); + Deque dq = entry.getValue(); + synchronized (dq) { + // iterate over the batches and expire them if they have stayed in accumulator for more than requestiTimeOut + Iterator batchIterator = dq.iterator(); + while (batchIterator.hasNext()) { + RecordBatch batch = batchIterator.next(); + Node leader = cluster.leaderFor(topicAndPartition); + if (batch != null && (leader == null)) { + // check if the batch is expired + if (batch.expire(requestTimeout, now)) { + expiredBatches.add(batch); + count++; + dq.remove(batch); + deallocate(batch); + } + } + } + } + } + if (expiredBatches.size() > 0) { + log.trace("Expired {} batches in accumulator", count); + } + + return expiredBatches; + } + + /** * Re-enqueue the given record batch in the accumulator to retry */ public void reenqueue(RecordBatch batch, long now) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 06182db..640a261 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -18,8 +18,11 @@ import java.util.List; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +45,7 @@ public final class RecordBatch { public final TopicPartition topicPartition; public final ProduceRequestResult produceFuture; private final List thunks; + private long lastAppendTime; public RecordBatch(TopicPartition tp, MemoryRecords records, long now) { this.createdMs = now; @@ -50,6 +54,7 @@ public final class RecordBatch { this.topicPartition = tp; this.produceFuture = new ProduceRequestResult(); this.thunks = new ArrayList(); + this.lastAppendTime = createdMs; } /** @@ -57,12 +62,13 @@ public final class RecordBatch { * * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ - public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) { + public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback, long now) { if (!this.records.hasRoomFor(key, value)) { return null; } else { this.records.append(0L, key, value); this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); + this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) thunks.add(new Thunk(callback, future)); @@ -116,4 +122,24 @@ public final class RecordBatch { public String toString() { return "RecordBatch(topicPartition=" + topicPartition + ", recordCount=" + recordCount + ")"; } + + /** + * Expire the batch that is ready but is sitting in accumulator for more than + * requestTimeout due to metadata being unava + * + * + * + * + * lable. + */ + public boolean expire(int requestTimeout, long now) { + boolean expire = false; + if (requestTimeout < (now - this.lastAppendTime)) { + expire = true; + this.records.close(); + this.done(-1L, new TimeoutException("Batch Expired")); + } + + return expire; + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 0baf16e..98a6cf6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -71,9 +71,6 @@ public class Sender implements Runnable { /* the number of acknowledgements to request from the server */ private final short acks; - /* the max time in ms for the server to wait for acknowlegements */ - private final int requestTimeout; - /* the number of times to retry a failed request before giving up */ private final int retries; @@ -92,27 +89,30 @@ public class Sender implements Runnable { /* param clientId of the client */ private String clientId; + /* the max time to wait for the server to respond to the request*/ + private final int requestTimeout; + public Sender(KafkaClient client, Metadata metadata, RecordAccumulator accumulator, int maxRequestSize, short acks, int retries, - int requestTimeout, Metrics metrics, Time time, - String clientId) { + String clientId, + int requestTimeout) { this.client = client; this.accumulator = accumulator; this.metadata = metadata; this.maxRequestSize = maxRequestSize; this.running = true; - this.requestTimeout = requestTimeout; this.acks = acks; this.retries = retries; this.time = time; this.clientId = clientId; this.sensors = new SenderMetrics(metrics); + this.requestTimeout = requestTimeout; } /** @@ -187,6 +187,13 @@ public class Sender implements Runnable { result.readyNodes, this.maxRequestSize, now); + + List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now); + // update sensors + for (RecordBatch expiredBatch : expiredBatches) { + this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); + } + sensors.updateProduceRequestMetrics(batches); List requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately @@ -200,7 +207,7 @@ public class Sender implements Runnable { pollTimeout = 0; } for (ClientRequest request : requests) - client.send(request); + client.send(request, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index aaf60c9..a5cd9f2 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -83,6 +83,7 @@ public class Selector implements Selectable { private final long connectionsMaxIdleNanos; private final int maxReceiveSize; private final boolean metricsPerConnection; + private final List clientDisconnects; private long currentTimeNanos; private long nextIdleCloseCheckTime; @@ -113,6 +114,7 @@ public class Selector implements Selectable { currentTimeNanos = new SystemTime().nanoseconds(); nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; this.metricsPerConnection = metricsPerConnection; + this.clientDisconnects = new LinkedList(); } public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map metricTags) { @@ -177,7 +179,10 @@ public class Selector implements Selectable { public void disconnect(String id) { SelectionKey key = this.keys.get(id); if (key != null) - key.cancel(); + { + close(id); + this.clientDisconnects.add(id); + } } /** @@ -412,7 +417,9 @@ public class Selector implements Selectable { this.connected.clear(); this.disconnected.clear(); this.disconnected.addAll(this.failedSends); + this.disconnected.addAll(this.clientDisconnects); this.failedSends.clear(); + this.clientDisconnects.clear(); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index d9c97e9..de9c429 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -91,7 +91,7 @@ public class MockClient implements KafkaClient { } @Override - public void send(ClientRequest request) { + public void send(ClientRequest request, long now) { if (!futureResponses.isEmpty()) { FutureResponse futureResp = futureResponses.poll(); ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody); diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 43238ce..a3fcce2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -45,13 +45,14 @@ import org.junit.Test; public class NetworkClientTest { + private final int REQUEST_TIMEOUT = 1000; private MockTime time = new MockTime(); private MockSelector selector = new MockSelector(time); private Metadata metadata = new Metadata(0, Long.MAX_VALUE); private int nodeId = 1; private Cluster cluster = TestUtils.singletonCluster("test", nodeId); private Node node = cluster.nodes().get(0); - private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024); + private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, REQUEST_TIMEOUT); @Before public void setup() { @@ -68,7 +69,8 @@ public class NetworkClientTest { selector.disconnect(node.idString()); client.poll(1, time.milliseconds()); selector.clear(); - assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds())); + assertFalse("After we forced the disconnection the client is no longer ready.", + client.ready(node, time.milliseconds())); assertTrue("Metadata should get updated.", metadata.timeToNextUpdate(time.milliseconds()) == 0); } @@ -78,7 +80,7 @@ public class NetworkClientTest { client.nextRequestHeader(ApiKeys.METADATA), new MetadataRequest(Arrays.asList("test")).toStruct()); ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null); - client.send(request); + client.send(request, time.milliseconds()); client.poll(1, time.milliseconds()); } @@ -90,7 +92,7 @@ public class NetworkClientTest { TestCallbackHandler handler = new TestCallbackHandler(); ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler); awaitReady(client, node); - client.send(request); + client.send(request, time.milliseconds()); client.poll(1, time.milliseconds()); assertEquals(1, client.inFlightRequestCount()); ResponseHeader respHeader = new ResponseHeader(reqHeader.correlationId()); @@ -113,8 +115,25 @@ public class NetworkClientTest { while (!client.ready(node, time.milliseconds())) client.poll(1, time.milliseconds()); } - - private static class TestCallbackHandler implements RequestCompletionHandler { + + @Test + public void testRequestTimeout() { + ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.emptyMap()); + RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE); + RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct()); + TestCallbackHandler handler = new TestCallbackHandler(); + ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler); + awaitReady(client, node); + long now = time.milliseconds(); + client.send(request, now); + // sleeping to make sure that the time since last send is greater than requestTimeOut + time.sleep(3000); + client.poll(3000, time.milliseconds()); + String disconnectedNode = selector.disconnected().get(0); + assertEquals(node.idString(), disconnectedNode); + } + + private static class TestCallbackHandler implements RequestCompletionHandler { public boolean executed = false; public ClientResponse response; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java index 2c69382..883b563 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; -import org.apache.kafka.clients.producer.BufferExhaustedException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestUtils; @@ -35,6 +35,7 @@ import static org.junit.Assert.*; public class BufferPoolTest { private MockTime time = new MockTime(); private Metrics metrics = new Metrics(time); + private final long MAX_BLOCK_TIME = 2000; String metricGroup = "TestMetrics"; Map metricTags = new LinkedHashMap(); @@ -45,8 +46,8 @@ public class BufferPoolTest { public void testSimple() throws Exception { long totalMemory = 64 * 1024; int size = 1024; - BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags); - ByteBuffer buffer = pool.allocate(size); + BufferPool pool = new BufferPool(totalMemory, size, metrics, time, metricGroup, metricTags); + ByteBuffer buffer = pool.allocate(size, MAX_BLOCK_TIME); assertEquals("Buffer size should equal requested size.", size, buffer.limit()); assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory()); assertEquals("Available memory should have shrunk", totalMemory - size, pool.availableMemory()); @@ -55,13 +56,13 @@ public class BufferPoolTest { pool.deallocate(buffer); assertEquals("All memory should be available", totalMemory, pool.availableMemory()); assertEquals("But now some is on the free list", totalMemory - size, pool.unallocatedMemory()); - buffer = pool.allocate(size); + buffer = pool.allocate(size, MAX_BLOCK_TIME); assertEquals("Recycled buffer should be cleared.", 0, buffer.position()); assertEquals("Recycled buffer should be cleared.", buffer.capacity(), buffer.limit()); pool.deallocate(buffer); assertEquals("All memory should be available", totalMemory, pool.availableMemory()); assertEquals("Still a single buffer on the free list", totalMemory - size, pool.unallocatedMemory()); - buffer = pool.allocate(2 * size); + buffer = pool.allocate(2 * size, MAX_BLOCK_TIME); pool.deallocate(buffer); assertEquals("All memory should be available", totalMemory, pool.availableMemory()); assertEquals("Non-standard size didn't go to the free list.", totalMemory - size, pool.unallocatedMemory()); @@ -72,23 +73,11 @@ public class BufferPoolTest { */ @Test(expected = IllegalArgumentException.class) public void testCantAllocateMoreMemoryThanWeHave() throws Exception { - BufferPool pool = new BufferPool(1024, 512, true, metrics, time, metricGroup, metricTags); - ByteBuffer buffer = pool.allocate(1024); + BufferPool pool = new BufferPool(1024, 512, metrics, time, metricGroup, metricTags); + ByteBuffer buffer = pool.allocate(1024, MAX_BLOCK_TIME); assertEquals(1024, buffer.limit()); pool.deallocate(buffer); - buffer = pool.allocate(1025); - } - - @Test - public void testNonblockingMode() throws Exception { - BufferPool pool = new BufferPool(2, 1, false, metrics, time, metricGroup, metricTags); - pool.allocate(1); - try { - pool.allocate(2); - fail("The buffer allocated more than it has!"); - } catch (BufferExhaustedException e) { - // this is good - } + buffer = pool.allocate(1025, MAX_BLOCK_TIME); } /** @@ -96,8 +85,8 @@ public class BufferPoolTest { */ @Test public void testDelayedAllocation() throws Exception { - BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags); - ByteBuffer buffer = pool.allocate(1024); + BufferPool pool = new BufferPool(5 * 1024, 1024, metrics, time, metricGroup, metricTags); + ByteBuffer buffer = pool.allocate(1024, MAX_BLOCK_TIME); CountDownLatch doDealloc = asyncDeallocate(pool, buffer); CountDownLatch allocation = asyncAllocate(pool, 5 * 1024); assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, allocation.getCount()); @@ -126,7 +115,7 @@ public class BufferPoolTest { Thread thread = new Thread() { public void run() { try { - pool.allocate(size); + pool.allocate(size, MAX_BLOCK_TIME); } catch (InterruptedException e) { e.printStackTrace(); } finally { @@ -139,6 +128,23 @@ public class BufferPoolTest { } /** + * Test if Timeout exception is thrown when there is not enough memory to allocate and the elapsed time is greater than the max specified block time + * + * @throws Exception + */ + @Test + public void testBlockTimeout() throws Exception { + BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup, metricTags); + pool.allocate(1, MAX_BLOCK_TIME); + try { + pool.allocate(2, MAX_BLOCK_TIME); + fail("The buffer allocated more memory than its maximum value 2"); + } catch (TimeoutException e) { + // this is good + } + } + + /** * This test creates lots of threads that hammer on the pool */ @Test @@ -147,7 +153,7 @@ public class BufferPoolTest { final int iterations = 50000; final int poolableSize = 1024; final long totalMemory = numThreads / 2 * poolableSize; - final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags); + final BufferPool pool = new BufferPool(totalMemory, poolableSize, metrics, time, metricGroup, metricTags); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) threads.add(new StressTestThread(pool, iterations)); @@ -163,6 +169,7 @@ public class BufferPoolTest { public static class StressTestThread extends Thread { private final int iterations; private final BufferPool pool; + private final long MAX_BLOCK_TIME_MS = 2000; public final AtomicBoolean success = new AtomicBoolean(false); public StressTestThread(BufferPool pool, int iterations) { @@ -180,7 +187,7 @@ public class BufferPoolTest { else // allocate a random size size = TestUtils.RANDOM.nextInt((int) pool.totalMemory()); - ByteBuffer buffer = pool.allocate(size); + ByteBuffer buffer = pool.allocate(size, MAX_BLOCK_TIME_MS); pool.deallocate(buffer); } success.set(true); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 5b2e4ff..e24f972 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -40,6 +41,8 @@ import org.apache.kafka.common.record.LogEntry; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; import org.junit.Test; public class RecordAccumulatorTest { @@ -63,17 +66,18 @@ public class RecordAccumulatorTest { private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); private Metrics metrics = new Metrics(time); Map metricTags = new LinkedHashMap(); + private final long MAX_BLOCK_TIME = 1000; @Test public void testFull() throws Exception { long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time, metricTags); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp1, key, value, null); + accum.append(tp1, key, value, null, MAX_BLOCK_TIME); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } - accum.append(tp1, key, value, null); + accum.append(tp1, key, value, null, MAX_BLOCK_TIME); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); @@ -91,16 +95,16 @@ public class RecordAccumulatorTest { @Test public void testAppendLarge() throws Exception { int batchSize = 512; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, new byte[2 * batchSize], null); + RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time, metricTags); + accum.append(tp1, key, new byte[2 * batchSize], null, MAX_BLOCK_TIME); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @Test public void testLinger() throws Exception { long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, value, null); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags); + accum.append(tp1, key, value, null, MAX_BLOCK_TIME); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -117,12 +121,12 @@ public class RecordAccumulatorTest { @Test public void testPartialDrain() throws Exception { - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time, metricTags); int appends = 1024 / msgSize + 1; List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) - accum.append(tp, key, value, null); + accum.append(tp, key, value, null, MAX_BLOCK_TIME); } assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -136,14 +140,14 @@ public class RecordAccumulatorTest { final int numThreads = 5; final int msgs = 10000; final int numParts = 2; - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, true, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time, metricTags); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition(topic, i % numParts), key, value, null); + accum.append(new TopicPartition(topic, i % numParts), key, value, null, MAX_BLOCK_TIME); } catch (Exception e) { e.printStackTrace(); } @@ -177,13 +181,13 @@ public class RecordAccumulatorTest { public void testNextReadyCheckDelay() throws Exception { // Next check time will use lingerMs since this test won't trigger any retries/backoff long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags); // Just short of going over the limit so we trigger linger time int appends = 1024 / msgSize; // Partition on node1 only for (int i = 0; i < appends; i++) - accum.append(tp1, key, value, null); + accum.append(tp1, key, value, null, MAX_BLOCK_TIME); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); @@ -192,14 +196,14 @@ public class RecordAccumulatorTest { // Add partition on node2 only for (int i = 0; i < appends; i++) - accum.append(tp3, key, value, null); + accum.append(tp3, key, value, null, MAX_BLOCK_TIME); result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) - accum.append(tp2, key, value, null); + accum.append(tp2, key, value, null, MAX_BLOCK_TIME); result = accum.ready(cluster, time.milliseconds()); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); // Note this can actually be < linger time because it may use delays from partitions that aren't sendable @@ -211,10 +215,10 @@ public class RecordAccumulatorTest { public void testRetryBackoff() throws Exception { long lingerMs = Long.MAX_VALUE / 4; long retryBackoffMs = Long.MAX_VALUE / 2; - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, metricTags); long now = time.milliseconds(); - accum.append(tp1, key, value, null); + accum.append(tp1, key, value, null, MAX_BLOCK_TIME); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); Map> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); @@ -226,7 +230,7 @@ public class RecordAccumulatorTest { accum.reenqueue(batches.get(0).get(0), now); // Put message for partition 1 into accumulator - accum.append(tp2, key, value, null); + accum.append(tp2, key, value, null, MAX_BLOCK_TIME); result = accum.ready(cluster, now + lingerMs + 1); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); @@ -248,9 +252,9 @@ public class RecordAccumulatorTest { @Test public void testFlush() throws Exception { long lingerMs = Long.MAX_VALUE; - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags); for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, null); + accum.append(new TopicPartition(topic, i % 3), key, value, null, MAX_BLOCK_TIME); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); @@ -272,7 +276,7 @@ public class RecordAccumulatorTest { public void testAbortIncompleteBatches() throws Exception { long lingerMs = Long.MAX_VALUE; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time, metricTags); class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { @@ -281,7 +285,7 @@ public class RecordAccumulatorTest { } } for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback()); + accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback(), MAX_BLOCK_TIME); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); @@ -291,4 +295,33 @@ public class RecordAccumulatorTest { } + @Test + public void testExpiredBatches() throws InterruptedException { + Time time = new SystemTime(); + long now = time.milliseconds(); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time, metricTags); + int appends = 1024 / msgSize; + for (int i = 0; i < appends; i++) { + accum.append(tp1, key, value, null, MAX_BLOCK_TIME); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); + } + time.sleep(2000); + accum.append(tp1, key, value, null, 0); + Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); + Cluster cluster = new Cluster(Arrays.asList(), Arrays.asList()); + now = time.milliseconds(); + List batches = accum.abortExpiredBatches(0, cluster, now); + assertEquals(1, batches.size()); + RecordBatch batch = batches.get(0); + batch.records.close(); + batch.records.flip(); + Iterator iter = batch.records.iterator(); + for (int i = 0; i < appends; i++) { + LogEntry entry = iter.next(); + assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key()); + assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value()); + } + assertFalse("No more records", iter.hasNext()); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 8b1805d..62fefb9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -42,7 +42,9 @@ public class SenderTest { private static final int MAX_REQUEST_SIZE = 1024 * 1024; private static final short ACKS_ALL = -1; private static final int MAX_RETRIES = 0; - private static final int REQUEST_TIMEOUT_MS = 10000; + private static final int TIMEOUT_MS = 10000; + private static final int MAX_BLOCK_TIMEOUT = 1000; + private static final int REQUEST_TIMEOUT = 1000; private TopicPartition tp = new TopicPartition("test", 0); private MockTime time = new MockTime(); @@ -52,17 +54,17 @@ public class SenderTest { private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = new Metrics(time); Map metricTags = new LinkedHashMap(); - private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, false, metrics, time, metricTags); + private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, metricTags); private Sender sender = new Sender(client, metadata, this.accumulator, MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, - REQUEST_TIMEOUT_MS, metrics, time, - "clientId"); + "clientId", + REQUEST_TIMEOUT); @Before public void setup() { @@ -72,7 +74,7 @@ public class SenderTest { @Test public void testSimple() throws Exception { long offset = 0; - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); @@ -94,12 +96,12 @@ public class SenderTest { MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, - REQUEST_TIMEOUT_MS, new Metrics(), time, - "clientId"); + "clientId", + REQUEST_TIMEOUT); // do a successful retry - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals(1, client.inFlightRequestCount()); @@ -116,7 +118,7 @@ public class SenderTest { assertEquals(offset, future.get().offset()); // do an unsuccessful retry - future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; + future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // send produce request for (int i = 0; i < maxRetries + 1; i++) { client.disconnect(client.requests().peek().request().destination()); -- 1.7.12.4