Index: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java (date 1438901563000) +++ clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java (revision ) @@ -12,6 +12,7 @@ */ package org.apache.kafka.clients.producer; +import java.io.File; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.ExecutionException; @@ -213,6 +214,10 @@ this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + boolean enableBufferBackingFile = config.getBoolean(ProducerConfig.FILE_BACKED_BUFFERS_CONFIG); + File backingFile = enableBufferBackingFile ? new File(config.getString(ProducerConfig.FILE_BACKED_BUFFERS_FILE_NAME_CONFIG)) : null; + + Map metricTags = new LinkedHashMap(); metricTags.put("client-id", clientId); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), @@ -220,6 +225,7 @@ this.compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, + backingFile, config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, time, Index: clients/src/main/java/org/apache/kafka/clients/producer/internals/ByteBufferPool.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/main/java/org/apache/kafka/clients/producer/internals/ByteBufferPool.java (revision ) +++ clients/src/main/java/org/apache/kafka/clients/producer/internals/ByteBufferPool.java (revision ) @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.kafka.clients.producer.BufferExhaustedException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.utils.Time; + + +/** + * A pool of ByteBuffers kept under a given memory limit. This class is fairly specific to the needs of the producer. In + * particular it has the following properties: + *
    + *
  1. There is a special "poolable size" and buffers of this size are kept in a free list and recycled + *
  2. It is fair. That is all memory is given to the longest waiting thread until it has sufficient memory. This + * prevents starvation or deadlock when a thread asks for a large chunk of memory and needs to block until multiple + * buffers are deallocated. + *
+ */ +public final class ByteBufferPool implements BufferPool { + + private final long totalMemory; + private final int poolableSize; + private final boolean blockOnExhaustion; + private final ReentrantLock lock; + private final Deque free; + private final Deque waiters; + private long availableMemory; + private final Metrics metrics; + private final Time time; + private final Sensor waitTime; + + /** + * Create a new buffer pool + * + * @param memory The maximum amount of memory that this buffer pool can allocate + * @param poolableSize The buffer size to cache in the free list rather than deallocating + * @param blockOnExhaustion This controls the behavior when the buffer pool is out of memory. If true the + * {@link #allocate(int)} call will block and wait for memory to be returned to the pool. If false + * {@link #allocate(int)} will throw an exception if the buffer is out of memory. + * @param metrics instance of Metrics + * @param time time instance + * @param metricGrpName logical group name for metrics + * @param metricTags additional key/val attributes for metrics + */ + public ByteBufferPool(long memory, int poolableSize, boolean blockOnExhaustion, Metrics metrics, Time time , String metricGrpName , Map metricTags) { + this.poolableSize = poolableSize; + this.blockOnExhaustion = blockOnExhaustion; + this.lock = new ReentrantLock(); + this.free = new ArrayDeque(); + this.waiters = new ArrayDeque(); + this.totalMemory = memory; + this.availableMemory = memory; + this.metrics = metrics; + this.time = time; + this.waitTime = this.metrics.sensor("bufferpool-wait-time"); + MetricName metricName = new MetricName("bufferpool-wait-ratio", + metricGrpName, + "The fraction of time an appender waits for space allocation.", + metricTags); + this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); + } + + @Override + public ByteBuffer allocate(int size) throws InterruptedException { + if (size > this.totalMemory) + throw new IllegalArgumentException("Attempt to allocate " + size + + " bytes, but there is a hard limit of " + + this.totalMemory + + " on memory allocations."); + + this.lock.lock(); + try { + // check if we have a free buffer of the right size pooled + if (size == poolableSize && !this.free.isEmpty()) + return this.free.pollFirst(); + + // now check if the request is immediately satisfiable with the + // memory on hand or if we need to block + int freeListSize = this.free.size() * this.poolableSize; + if (this.availableMemory + freeListSize >= size) { + // we have enough unallocated or pooled memory to immediately + // satisfy the request + freeUp(size); + this.availableMemory -= size; + lock.unlock(); + return ByteBuffer.allocate(size); + } else if (!blockOnExhaustion) { + throw new BufferExhaustedException("You have exhausted the " + this.totalMemory + + " bytes of memory you configured for the client and the client is configured to error" + + " rather than block when memory is exhausted."); + } else { + // we are out of memory and will have to block + int accumulated = 0; + ByteBuffer buffer = null; + Condition moreMemory = this.lock.newCondition(); + this.waiters.addLast(moreMemory); + // loop over and over until we have a buffer or have reserved + // enough memory to allocate one + while (accumulated < size) { + long startWait = time.nanoseconds(); + moreMemory.await(); + long endWait = time.nanoseconds(); + this.waitTime.record(endWait - startWait, time.milliseconds()); + + // check if we can satisfy this request from the free list, + // otherwise allocate memory + if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { + // just grab a buffer from the free list + buffer = this.free.pollFirst(); + accumulated = size; + } else { + // we'll need to allocate memory, but we may only get + // part of what we need on this iteration + freeUp(size - accumulated); + int got = (int) Math.min(size - accumulated, this.availableMemory); + this.availableMemory -= got; + accumulated += got; + } + } + + // remove the condition for this thread to let the next thread + // in line start getting memory + Condition removed = this.waiters.removeFirst(); + if (removed != moreMemory) + throw new IllegalStateException("Wrong condition: this shouldn't happen."); + + // signal any additional waiters if there is more memory left + // over for them + if (this.availableMemory > 0 || !this.free.isEmpty()) { + if (!this.waiters.isEmpty()) + this.waiters.peekFirst().signal(); + } + + // unlock and return the buffer + lock.unlock(); + if (buffer == null) + return ByteBuffer.allocate(size); + else + return buffer; + } + } finally { + if (lock.isHeldByCurrentThread()) + lock.unlock(); + } + } + + /** + * Attempt to ensure we have at least the requested number of bytes of memory for allocation by deallocating pooled + * buffers (if needed) + */ + private void freeUp(int size) { + while (!this.free.isEmpty() && this.availableMemory < size) + this.availableMemory += this.free.pollLast().capacity(); + } + + @Override + public void deallocate(ByteBuffer buffer, int size) { + lock.lock(); + try { + if (size == this.poolableSize && size == buffer.capacity()) { + buffer.clear(); + this.free.add(buffer); + } else { + this.availableMemory += size; + } + Condition moreMem = this.waiters.peekFirst(); + if (moreMem != null) + moreMem.signal(); + } finally { + lock.unlock(); + } + } + + @Override + public void deallocate(ByteBuffer buffer) { + deallocate(buffer, buffer.capacity()); + } + + @Override + public long availableMemory() { + lock.lock(); + try { + return this.availableMemory + this.free.size() * this.poolableSize; + } finally { + lock.unlock(); + } + } + + @Override + public long unallocatedMemory() { + lock.lock(); + try { + return this.availableMemory; + } finally { + lock.unlock(); + } + } + + @Override + public int queued() { + lock.lock(); + try { + return this.waiters.size(); + } finally { + lock.unlock(); + } + } + + @Override + public int poolableSize() { + return this.poolableSize; + } + + @Override + public long totalMemory() { + return this.totalMemory; + } +} \ No newline at end of file Index: clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java (date 1438901563000) +++ clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java (revision ) @@ -45,7 +45,7 @@ public void testSimple() throws Exception { long totalMemory = 64 * 1024; int size = 1024; - BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags); + BufferPool pool = new ByteBufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags); ByteBuffer buffer = pool.allocate(size); assertEquals("Buffer size should equal requested size.", size, buffer.limit()); assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory()); @@ -72,7 +72,7 @@ */ @Test(expected = IllegalArgumentException.class) public void testCantAllocateMoreMemoryThanWeHave() throws Exception { - BufferPool pool = new BufferPool(1024, 512, true, metrics, time, metricGroup, metricTags); + BufferPool pool = new ByteBufferPool(1024, 512, true, metrics, time, metricGroup, metricTags); ByteBuffer buffer = pool.allocate(1024); assertEquals(1024, buffer.limit()); pool.deallocate(buffer); @@ -81,7 +81,7 @@ @Test public void testNonblockingMode() throws Exception { - BufferPool pool = new BufferPool(2, 1, false, metrics, time, metricGroup, metricTags); + BufferPool pool = new ByteBufferPool(2, 1, false, metrics, time, metricGroup, metricTags); pool.allocate(1); try { pool.allocate(2); @@ -96,7 +96,7 @@ */ @Test public void testDelayedAllocation() throws Exception { - BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags); + BufferPool pool = new ByteBufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags); ByteBuffer buffer = pool.allocate(1024); CountDownLatch doDealloc = asyncDeallocate(pool, buffer); CountDownLatch allocation = asyncAllocate(pool, 5 * 1024); @@ -147,7 +147,7 @@ final int iterations = 50000; final int poolableSize = 1024; final long totalMemory = numThreads / 2 * poolableSize; - final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags); + final BufferPool pool = new ByteBufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) threads.add(new StressTestThread(pool, iterations)); Index: clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java (date 1438901563000) +++ clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java (revision ) @@ -17,74 +17,12 @@ package org.apache.kafka.clients.producer.internals; import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; -import org.apache.kafka.clients.producer.BufferExhaustedException; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.utils.Time; - - /** - * A pool of ByteBuffers kept under a given memory limit. This class is fairly specific to the needs of the producer. In - * particular it has the following properties: - *
    - *
  1. There is a special "poolable size" and buffers of this size are kept in a free list and recycled - *
  2. It is fair. That is all memory is given to the longest waiting thread until it has sufficient memory. This - * prevents starvation or deadlock when a thread asks for a large chunk of memory and needs to block until multiple - * buffers are deallocated. - *
- */ -public final class BufferPool { - - private final long totalMemory; - private final int poolableSize; - private final boolean blockOnExhaustion; - private final ReentrantLock lock; - private final Deque free; - private final Deque waiters; - private long availableMemory; - private final Metrics metrics; - private final Time time; - private final Sensor waitTime; - - /** - * Create a new buffer pool + * A pool of byte buffers that will be reused - * + * - * @param memory The maximum amount of memory that this buffer pool can allocate - * @param poolableSize The buffer size to cache in the free list rather than deallocating - * @param blockOnExhaustion This controls the behavior when the buffer pool is out of memory. If true the - * {@link #allocate(int)} call will block and wait for memory to be returned to the pool. If false - * {@link #allocate(int)} will throw an exception if the buffer is out of memory. - * @param metrics instance of Metrics - * @param time time instance - * @param metricGrpName logical group name for metrics - * @param metricTags additional key/val attributes for metrics - */ + */ - public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion, Metrics metrics, Time time , String metricGrpName , Map metricTags) { - this.poolableSize = poolableSize; - this.blockOnExhaustion = blockOnExhaustion; - this.lock = new ReentrantLock(); - this.free = new ArrayDeque(); - this.waiters = new ArrayDeque(); - this.totalMemory = memory; - this.availableMemory = memory; - this.metrics = metrics; - this.time = time; - this.waitTime = this.metrics.sensor("bufferpool-wait-time"); - MetricName metricName = new MetricName("bufferpool-wait-ratio", - metricGrpName, - "The fraction of time an appender waits for space allocation.", - metricTags); - this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); - } +public interface BufferPool { /** * Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool @@ -97,174 +35,49 @@ * forever) * @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool */ - public ByteBuffer allocate(int size) throws InterruptedException { - if (size > this.totalMemory) - throw new IllegalArgumentException("Attempt to allocate " + size - + " bytes, but there is a hard limit of " - + this.totalMemory - + " on memory allocations."); + public ByteBuffer allocate(int size) throws InterruptedException; - this.lock.lock(); - try { - // check if we have a free buffer of the right size pooled - if (size == poolableSize && !this.free.isEmpty()) - return this.free.pollFirst(); - - // now check if the request is immediately satisfiable with the - // memory on hand or if we need to block - int freeListSize = this.free.size() * this.poolableSize; - if (this.availableMemory + freeListSize >= size) { - // we have enough unallocated or pooled memory to immediately - // satisfy the request - freeUp(size); - this.availableMemory -= size; - lock.unlock(); - return ByteBuffer.allocate(size); - } else if (!blockOnExhaustion) { - throw new BufferExhaustedException("You have exhausted the " + this.totalMemory - + " bytes of memory you configured for the client and the client is configured to error" - + " rather than block when memory is exhausted."); - } else { - // we are out of memory and will have to block - int accumulated = 0; - ByteBuffer buffer = null; - Condition moreMemory = this.lock.newCondition(); - this.waiters.addLast(moreMemory); - // loop over and over until we have a buffer or have reserved - // enough memory to allocate one - while (accumulated < size) { - long startWait = time.nanoseconds(); - moreMemory.await(); - long endWait = time.nanoseconds(); - this.waitTime.record(endWait - startWait, time.milliseconds()); - - // check if we can satisfy this request from the free list, - // otherwise allocate memory - if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { - // just grab a buffer from the free list - buffer = this.free.pollFirst(); - accumulated = size; - } else { - // we'll need to allocate memory, but we may only get - // part of what we need on this iteration - freeUp(size - accumulated); - int got = (int) Math.min(size - accumulated, this.availableMemory); - this.availableMemory -= got; - accumulated += got; - } - } - - // remove the condition for this thread to let the next thread - // in line start getting memory - Condition removed = this.waiters.removeFirst(); - if (removed != moreMemory) - throw new IllegalStateException("Wrong condition: this shouldn't happen."); - - // signal any additional waiters if there is more memory left - // over for them - if (this.availableMemory > 0 || !this.free.isEmpty()) { - if (!this.waiters.isEmpty()) - this.waiters.peekFirst().signal(); - } - - // unlock and return the buffer - lock.unlock(); - if (buffer == null) - return ByteBuffer.allocate(size); - else - return buffer; - } - } finally { - if (lock.isHeldByCurrentThread()) - lock.unlock(); - } - } - /** - * Attempt to ensure we have at least the requested number of bytes of memory for allocation by deallocating pooled - * buffers (if needed) + * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the + * memory as free. + * + * @param buffer The buffer to return + * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity + * since the buffer may re-allocate itself during in-place compression */ - private void freeUp(int size) { - while (!this.free.isEmpty() && this.availableMemory < size) - this.availableMemory += this.free.pollLast().capacity(); - } + public void deallocate(ByteBuffer buffer, int size); /** * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the * memory as free. * * @param buffer The buffer to return - * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity - * since the buffer may re-allocate itself during in-place compression */ - public void deallocate(ByteBuffer buffer, int size) { - lock.lock(); - try { - if (size == this.poolableSize && size == buffer.capacity()) { - buffer.clear(); - this.free.add(buffer); - } else { - this.availableMemory += size; - } - Condition moreMem = this.waiters.peekFirst(); - if (moreMem != null) - moreMem.signal(); - } finally { - lock.unlock(); - } - } + public void deallocate(ByteBuffer buffer); - public void deallocate(ByteBuffer buffer) { - deallocate(buffer, buffer.capacity()); - } - /** * the total free memory both unallocated and in the free list */ - public long availableMemory() { - lock.lock(); - try { - return this.availableMemory + this.free.size() * this.poolableSize; - } finally { - lock.unlock(); - } - } + public long availableMemory(); /** * Get the unallocated memory (not in the free list or in use) */ - public long unallocatedMemory() { - lock.lock(); - try { - return this.availableMemory; - } finally { - lock.unlock(); - } - } + public long unallocatedMemory(); /** * The number of threads blocked waiting on memory */ - public int queued() { - lock.lock(); - try { - return this.waiters.size(); - } finally { - lock.unlock(); - } - } + public int queued(); /** * The buffer size that will be retained in the free list after use */ - public int poolableSize() { - return this.poolableSize; - } + public int poolableSize(); /** * The total memory managed by this pool */ - public long totalMemory() { - return this.totalMemory; - } + public long totalMemory(); + } \ No newline at end of file Index: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java (date 1438901563000) +++ clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java (revision ) @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -13,11 +13,7 @@ package org.apache.kafka.clients.producer.internals; import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.*; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -33,23 +29,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; /** * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} * instances to be sent to the server. - *

+ *

* The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless * this behavior is explicitly disabled. */ @@ -73,26 +63,27 @@ /** * Create a new record accumulator - * + * - * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances + * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances - * @param totalSize The maximum memory the record accumulator can use. + * @param totalSize The maximum memory the record accumulator can use. - * @param compression The compression codec for the records + * @param compression The compression codec for the records - * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for + * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for - * sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some + * sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some - * latency for potentially better throughput due to more batching (and hence fewer, larger requests). + * latency for potentially better throughput due to more batching (and hence fewer, larger requests). - * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids + * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids - * exhausting all retries in a short period of time. + * exhausting all retries in a short period of time. * @param blockOnBufferFull If true block when we are out of memory; if false throw an exception when we are out of - * memory + * memory - * @param metrics The metrics + * @param metrics The metrics - * @param time The time instance to use + * @param time The time instance to use - * @param metricTags additional key/value attributes of the metric + * @param metricTags additional key/value attributes of the metric */ public RecordAccumulator(int batchSize, long totalSize, CompressionType compression, long lingerMs, long retryBackoffMs, + File backingFile, boolean blockOnBufferFull, Metrics metrics, Time time, @@ -107,7 +98,15 @@ this.retryBackoffMs = retryBackoffMs; this.batches = new CopyOnWriteMap>(); String metricGrpName = "producer-metrics"; - this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags); + if (backingFile == null) { + this.free = new ByteBufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time, metricGrpName, metricTags); + } else { + try { + this.free = new MmapBufferPool(backingFile, totalSize, batchSize, blockOnBufferFull); + } catch (IOException e) { + throw new KafkaException("Failed to create mmap-based buffer.", e); + } + } this.incomplete = new IncompleteRecordBatches(); this.time = time; registerMetrics(metrics, metricGrpName, metricTags); @@ -145,13 +144,13 @@ /** * Add a record to the accumulator, return the append result - *

+ *

* The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created - *

+ *

* - * @param tp The topic/partition to which this record is being sent + * @param tp The topic/partition to which this record is being sent - * @param key The key for the record + * @param key The key for the record - * @param value The value for the record + * @param value The value for the record * @param callback The user-supplied callback to execute when the request is complete */ public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { @@ -218,7 +217,7 @@ * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated * partition batches. - *

+ *

* A destination node is ready to send data if ANY one of its partition is not backing off the send and ANY of the * following are true : *

    @@ -286,11 +285,11 @@ /** * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over. - * + * * @param cluster The current cluster metadata - * @param nodes The list of node to drain + * @param nodes The list of node to drain * @param maxSize The maximum number of bytes to drain - * @param now The current unix time in milliseconds + * @param now The current unix time in milliseconds * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize. */ public Map> drain(Cluster cluster, Set nodes, int maxSize, long now) { @@ -356,14 +355,14 @@ incomplete.remove(batch); free.deallocate(batch.records.buffer(), batch.records.capacity()); } - + /** * Are there any threads currently waiting on a flush? */ private boolean flushInProgress() { return flushesInProgress.get() > 0; } - + /** * Initiate the flushing of data from the accumulator...this makes all requests immediately ready */ @@ -382,7 +381,7 @@ * Mark all partitions as ready to send and block until the send is complete */ public void awaitFlushCompletion() throws InterruptedException { - for (RecordBatch batch: this.incomplete.all()) + for (RecordBatch batch : this.incomplete.all()) batch.produceFuture.await(); this.flushesInProgress.decrementAndGet(); } @@ -457,7 +456,7 @@ this.unknownLeadersExist = unknownLeadersExist; } } - + /* * A threadsafe helper class to hold RecordBatches that haven't been ack'd yet */ @@ -467,13 +466,13 @@ public IncompleteRecordBatches() { this.incomplete = new HashSet(); } - + public void add(RecordBatch batch) { synchronized (incomplete) { this.incomplete.add(batch); } } - + public void remove(RecordBatch batch) { synchronized (incomplete) { boolean removed = this.incomplete.remove(batch); @@ -481,7 +480,7 @@ throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible."); } } - + public Iterable all() { synchronized (incomplete) { return new ArrayList(this.incomplete); Index: clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java (date 1438901563000) +++ clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java (revision ) @@ -52,7 +52,7 @@ private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = new Metrics(time); Map metricTags = new LinkedHashMap(); - private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, false, metrics, time, metricTags); + private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, null, false, metrics, time, metricTags); private Sender sender = new Sender(client, metadata, this.accumulator, Index: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java (date 1438901563000) +++ clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java (revision ) @@ -12,14 +12,6 @@ */ package org.apache.kafka.clients.producer; -import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; -import static org.apache.kafka.common.config.ConfigDef.Range.between; -import static org.apache.kafka.common.config.ConfigDef.ValidString.in; - -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -27,6 +19,16 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.serialization.Serializer; +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Range.between; +import static org.apache.kafka.common.config.ConfigDef.ValidString.in; + /** * Configuration for the Kafka Producer. Documentation for these configurations can be found in the Kafka documentation @@ -176,6 +178,12 @@ public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the Partitioner interface."; + public static final String FILE_BACKED_BUFFERS_CONFIG = "file.backed.buffers.enable"; + private static final String FILE_BACKED_BUFFERS_DOC = "write me"; + + public static final String FILE_BACKED_BUFFERS_FILE_NAME_CONFIG = "file.buffer.file.name"; + private static final String FILE_BACKED_BUFFERS_FILE_NAME_DOC = "write me"; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) @@ -225,9 +233,16 @@ MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) + .define(FILE_BACKED_BUFFERS_CONFIG, Type.BOOLEAN, false, Importance.LOW, FILE_BACKED_BUFFERS_DOC) + .define(FILE_BACKED_BUFFERS_FILE_NAME_CONFIG, + Type.STRING, + new File(System.getProperty("java.io.tmpdir"), "kafka-producer-data-" + new Random().nextInt(Integer.MAX_VALUE) + ".dat").getAbsolutePath(), + Importance.LOW, + FILE_BACKED_BUFFERS_FILE_NAME_DOC) /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC); + } public static Map addSerializerToConfig(Map configs, Index: clients/src/main/java/org/apache/kafka/clients/producer/internals/MmapBufferPool.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/main/java/org/apache/kafka/clients/producer/internals/MmapBufferPool.java (revision ) +++ clients/src/main/java/org/apache/kafka/clients/producer/internals/MmapBufferPool.java (revision ) @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.clients.producer.BufferExhaustedException; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel.MapMode; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; + +public class MmapBufferPool implements BufferPool { + + private final long maxSize; + private final int chunkSize; + private final boolean blockOnExhaustion; + private final BlockingDeque free; + + public MmapBufferPool(File backingFileName, long maxSize, int chunkSize, boolean blockOnExhaustion) throws IOException { + this.maxSize = maxSize; + this.chunkSize = chunkSize; + this.blockOnExhaustion = blockOnExhaustion; + this.free = new LinkedBlockingDeque(); + RandomAccessFile f = new RandomAccessFile(backingFileName, "rw"); + f.setLength(maxSize); + MappedByteBuffer buffer = f.getChannel().map(MapMode.READ_WRITE, 0, maxSize); + while (buffer.remaining() >= chunkSize) { + ByteBuffer b = buffer.slice(); + b.limit(chunkSize); + buffer.position(buffer.position() + chunkSize); + free.add(b); + } + f.close(); + } + + @Override + public ByteBuffer allocate(int size) throws InterruptedException { + if (size > chunkSize) + throw new IllegalArgumentException("Illegal allocation size."); + ByteBuffer buffer; + if (blockOnExhaustion) { + buffer = this.free.take(); + } else { + buffer = this.free.poll(); + if (buffer == null) { + throw new BufferExhaustedException("You have exhausted the " + this.maxSize + + " bytes of memory you configured for the client and the client is configured to error" + + " rather than block when memory is exhausted."); + } + } + + return buffer; + } + + @Override + public void deallocate(ByteBuffer buffer, int size) { + // TODO: check size + this.free.add(buffer); + } + + @Override + public void deallocate(ByteBuffer buffer) { + // TODO: check size + this.free.add(buffer); + } + + @Override + public long availableMemory() { + // TODO write me + return 0; + } + + @Override + public long unallocatedMemory() { + // TODO write me + return 0; + } + + @Override + public int queued() { + // TODO write me + return 0; + } + + @Override + public int poolableSize() { + // TODO write me + return 0; + } + + @Override + public long totalMemory() { + // TODO write me + return 0; + } + +} Index: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java (date 1438901563000) +++ clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java (revision ) @@ -36,7 +36,7 @@ // Construct a writable memory records private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int sizeLimit) { this.writable = writable; - this.capacity = buffer.capacity(); + this.capacity = buffer.limit(); this.sizeLimit = sizeLimit; if (this.writable) { this.buffer = null; Index: clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java (date 1438901563000) +++ clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java (revision ) @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + *

    * http://www.apache.org/licenses/LICENSE-2.0 - * + *

    * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -67,7 +67,7 @@ @Test public void testFull() throws Exception { long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, null, false, metrics, time, metricTags); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { accum.append(tp1, key, value, null); @@ -91,7 +91,7 @@ @Test public void testAppendLarge() throws Exception { int batchSize = 512; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, null, false, metrics, time, metricTags); accum.append(tp1, key, new byte[2 * batchSize], null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @@ -99,7 +99,7 @@ @Test public void testLinger() throws Exception { long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, null, false, metrics, time, metricTags); accum.append(tp1, key, value, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); @@ -117,7 +117,7 @@ @Test public void testPartialDrain() throws Exception { - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, null, false, metrics, time, metricTags); int appends = 1024 / msgSize + 1; List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { @@ -136,7 +136,7 @@ final int numThreads = 5; final int msgs = 10000; final int numParts = 2; - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, true, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, null, true, metrics, time, metricTags); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { @@ -177,7 +177,7 @@ public void testNextReadyCheckDelay() throws Exception { // Next check time will use lingerMs since this test won't trigger any retries/backoff long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, null, false, metrics, time, metricTags); // Just short of going over the limit so we trigger linger time int appends = 1024 / msgSize; @@ -211,7 +211,7 @@ public void testRetryBackoff() throws Exception { long lingerMs = Long.MAX_VALUE / 4; long retryBackoffMs = Long.MAX_VALUE / 2; - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, null, false, metrics, time, metricTags); long now = time.milliseconds(); accum.append(tp1, key, value, null); @@ -244,25 +244,25 @@ assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size()); assertEquals("Node1 should only have one batch for partition 0.", tp1, batches.get(0).get(0).topicPartition); } - + @Test public void testFlush() throws Exception { long lingerMs = Long.MAX_VALUE; - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, null, false, metrics, time, metricTags); for (int i = 0; i < 100; i++) accum.append(new TopicPartition(topic, i % 3), key, value, null); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); - + accum.beginFlush(); result = accum.ready(cluster, time.milliseconds()); - + // drain and deallocate all batches Map> results = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); - for (List batches: results.values()) + for (List batches : results.values()) - for (RecordBatch batch: batches) + for (RecordBatch batch : batches) accum.deallocate(batch); - + // should be complete with no unsent records. accum.awaitFlushCompletion(); assertFalse(accum.hasUnsent()); @@ -272,7 +272,7 @@ public void testAbortIncompleteBatches() throws Exception { long lingerMs = Long.MAX_VALUE; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, null, false, metrics, time, metricTags); class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) {