From a541b4042ffe0ef25f5e776ca9799359688d4d9b Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Sat, 14 Feb 2015 13:42:00 -0800 Subject: [PATCH] KAFKA-1955 WIP: Allow a file buffer in the producer. --- .../kafka/clients/producer/KafkaProducer.java | 4 + .../kafka/clients/producer/ProducerConfig.java | 16 +- .../clients/producer/internals/BufferPool.java | 237 ++------------------ .../clients/producer/internals/ByteBufferPool.java | 244 +++++++++++++++++++++ .../clients/producer/internals/MmapBufferPool.java | 98 +++++++++ .../producer/internals/RecordAccumulator.java | 14 +- .../apache/kafka/common/record/MemoryRecords.java | 2 +- .../kafka/clients/producer/BufferPoolTest.java | 11 +- .../clients/producer/RecordAccumulatorTest.java | 12 +- .../apache/kafka/clients/producer/SenderTest.java | 2 +- 10 files changed, 406 insertions(+), 234 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/internals/ByteBufferPool.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/internals/MmapBufferPool.java diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 1fd6917..7646d6c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.clients.producer; +import java.io.File; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.ExecutionException; @@ -177,12 +178,15 @@ public class KafkaProducer implements Producer { this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + boolean enableBufferBackingFile = config.getBoolean(ProducerConfig.FILE_BACKED_BUFFERS_CONFIG); + File backingFile = enableBufferBackingFile ? new File(config.getString(ProducerConfig.FILE_BACKED_BUFFERS_FILE_NAME_CONFIG)) : null; Map metricTags = new LinkedHashMap(); metricTags.put("client-id", clientId); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, + backingFile, config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, time, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 122375c..0ae2137 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -16,7 +16,9 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; +import java.io.File; import java.util.Map; +import java.util.Random; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.AbstractConfig; @@ -166,6 +168,12 @@ public class ProducerConfig extends AbstractConfig { public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the Serializer interface."; + public static final String FILE_BACKED_BUFFERS_CONFIG = "file.backed.buffers.enable"; + private static final String FILE_BACKED_BUFFERS_DOC = "write me"; + + public static final String FILE_BACKED_BUFFERS_FILE_NAME_CONFIG = "file.buffer.file.name"; + private static final String FILE_BACKED_BUFFERS_FILE_NAME_DOC = "write me"; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) @@ -214,7 +222,13 @@ public class ProducerConfig extends AbstractConfig { Importance.LOW, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) - .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC); + .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) + .define(FILE_BACKED_BUFFERS_CONFIG, Type.BOOLEAN, false, Importance.LOW, FILE_BACKED_BUFFERS_DOC) + .define(FILE_BACKED_BUFFERS_FILE_NAME_CONFIG, + Type.STRING, + new File(System.getProperty("java.io.tmpdir"), "kafka-producer-data-" + new Random().nextInt(Integer.MAX_VALUE) + ".dat").getAbsolutePath(), + Importance.LOW, + FILE_BACKED_BUFFERS_FILE_NAME_DOC); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 4cb1e50..7d0112a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -1,90 +1,14 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.apache.kafka.clients.producer.internals; import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import org.apache.kafka.clients.producer.BufferExhaustedException; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.utils.Time; - /** - * A pool of ByteBuffers kept under a given memory limit. This class is fairly specific to the needs of the producer. In - * particular it has the following properties: - *
    - *
  1. There is a special "poolable size" and buffers of this size are kept in a free list and recycled - *
  2. It is fair. That is all memory is given to the longest waiting thread until it has sufficient memory. This - * prevents starvation or deadlock when a thread asks for a large chunk of memory and needs to block until multiple - * buffers are deallocated. - *
+ * A pool of byte buffers that will be reused + * */ -public final class BufferPool { - - private final long totalMemory; - private final int poolableSize; - private final boolean blockOnExhaustion; - private final ReentrantLock lock; - private final Deque free; - private final Deque waiters; - private long availableMemory; - private final Metrics metrics; - private final Time time; - private final Sensor waitTime; - - /** - * Create a new buffer pool - * - * @param memory The maximum amount of memory that this buffer pool can allocate - * @param poolableSize The buffer size to cache in the free list rather than deallocating - * @param blockOnExhaustion This controls the behavior when the buffer pool is out of memory. If true the - * {@link #allocate(int)} call will block and wait for memory to be returned to the pool. If false - * {@link #allocate(int)} will throw an exception if the buffer is out of memory. - * @param metrics instance of Metrics - * @param time time instance - * @param metricGrpName logical group name for metrics - * @param metricTags additional key/val attributes for metrics - */ - public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion, Metrics metrics, Time time , String metricGrpName , Map metricTags) { - this.poolableSize = poolableSize; - this.blockOnExhaustion = blockOnExhaustion; - this.lock = new ReentrantLock(); - this.free = new ArrayDeque(); - this.waiters = new ArrayDeque(); - this.totalMemory = memory; - this.availableMemory = memory; - this.metrics = metrics; - this.time = time; - this.waitTime = this.metrics.sensor("bufferpool-wait-time"); - MetricName metricName = new MetricName("bufferpool-wait-ratio", - metricGrpName, - "The fraction of time an appender waits for space allocation.", - metricTags); - this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); - } +public interface BufferPool { /** * Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool @@ -97,97 +21,7 @@ public final class BufferPool { * forever) * @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool */ - public ByteBuffer allocate(int size) throws InterruptedException { - if (size > this.totalMemory) - throw new IllegalArgumentException("Attempt to allocate " + size - + " bytes, but there is a hard limit of " - + this.totalMemory - + " on memory allocations."); - - this.lock.lock(); - try { - // check if we have a free buffer of the right size pooled - if (size == poolableSize && !this.free.isEmpty()) - return this.free.pollFirst(); - - // now check if the request is immediately satisfiable with the - // memory on hand or if we need to block - int freeListSize = this.free.size() * this.poolableSize; - if (this.availableMemory + freeListSize >= size) { - // we have enough unallocated or pooled memory to immediately - // satisfy the request - freeUp(size); - this.availableMemory -= size; - lock.unlock(); - return ByteBuffer.allocate(size); - } else if (!blockOnExhaustion) { - throw new BufferExhaustedException("You have exhausted the " + this.totalMemory - + " bytes of memory you configured for the client and the client is configured to error" - + " rather than block when memory is exhausted."); - } else { - // we are out of memory and will have to block - int accumulated = 0; - ByteBuffer buffer = null; - Condition moreMemory = this.lock.newCondition(); - this.waiters.addLast(moreMemory); - // loop over and over until we have a buffer or have reserved - // enough memory to allocate one - while (accumulated < size) { - long startWait = time.nanoseconds(); - moreMemory.await(); - long endWait = time.nanoseconds(); - this.waitTime.record(endWait - startWait, time.milliseconds()); - - // check if we can satisfy this request from the free list, - // otherwise allocate memory - if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { - // just grab a buffer from the free list - buffer = this.free.pollFirst(); - accumulated = size; - } else { - // we'll need to allocate memory, but we may only get - // part of what we need on this iteration - freeUp(size - accumulated); - int got = (int) Math.min(size - accumulated, this.availableMemory); - this.availableMemory -= got; - accumulated += got; - } - } - - // remove the condition for this thread to let the next thread - // in line start getting memory - Condition removed = this.waiters.removeFirst(); - if (removed != moreMemory) - throw new IllegalStateException("Wrong condition: this shouldn't happen."); - - // signal any additional waiters if there is more memory left - // over for them - if (this.availableMemory > 0 || !this.free.isEmpty()) { - if (!this.waiters.isEmpty()) - this.waiters.peekFirst().signal(); - } - - // unlock and return the buffer - lock.unlock(); - if (buffer == null) - return ByteBuffer.allocate(size); - else - return buffer; - } - } finally { - if (lock.isHeldByCurrentThread()) - lock.unlock(); - } - } - - /** - * Attempt to ensure we have at least the requested number of bytes of memory for allocation by deallocating pooled - * buffers (if needed) - */ - private void freeUp(int size) { - while (!this.free.isEmpty() && this.availableMemory < size) - this.availableMemory += this.free.pollLast().capacity(); - } + public ByteBuffer allocate(int size) throws InterruptedException; /** * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the @@ -197,74 +31,39 @@ public final class BufferPool { * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity * since the buffer may re-allocate itself during in-place compression */ - public void deallocate(ByteBuffer buffer, int size) { - lock.lock(); - try { - if (size == this.poolableSize && size == buffer.capacity()) { - buffer.clear(); - this.free.add(buffer); - } else { - this.availableMemory += size; - } - Condition moreMem = this.waiters.peekFirst(); - if (moreMem != null) - moreMem.signal(); - } finally { - lock.unlock(); - } - } + public void deallocate(ByteBuffer buffer, int size); - public void deallocate(ByteBuffer buffer) { - deallocate(buffer, buffer.capacity()); - } + /** + * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the + * memory as free. + * + * @param buffer The buffer to return + */ + public void deallocate(ByteBuffer buffer); /** * the total free memory both unallocated and in the free list */ - public long availableMemory() { - lock.lock(); - try { - return this.availableMemory + this.free.size() * this.poolableSize; - } finally { - lock.unlock(); - } - } + public long availableMemory(); /** * Get the unallocated memory (not in the free list or in use) */ - public long unallocatedMemory() { - lock.lock(); - try { - return this.availableMemory; - } finally { - lock.unlock(); - } - } + public long unallocatedMemory(); /** * The number of threads blocked waiting on memory */ - public int queued() { - lock.lock(); - try { - return this.waiters.size(); - } finally { - lock.unlock(); - } - } + public int queued(); /** * The buffer size that will be retained in the free list after use */ - public int poolableSize() { - return this.poolableSize; - } + public int poolableSize(); /** * The total memory managed by this pool */ - public long totalMemory() { - return this.totalMemory; - } + public long totalMemory(); + } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ByteBufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ByteBufferPool.java new file mode 100644 index 0000000..a877b64 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ByteBufferPool.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.kafka.clients.producer.BufferExhaustedException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.utils.Time; + + +/** + * A pool of ByteBuffers kept under a given memory limit. This class is fairly specific to the needs of the producer. In + * particular it has the following properties: + *
    + *
  1. There is a special "poolable size" and buffers of this size are kept in a free list and recycled + *
  2. It is fair. That is all memory is given to the longest waiting thread until it has sufficient memory. This + * prevents starvation or deadlock when a thread asks for a large chunk of memory and needs to block until multiple + * buffers are deallocated. + *
+ */ +public final class ByteBufferPool implements BufferPool { + + private final long totalMemory; + private final int poolableSize; + private final boolean blockOnExhaustion; + private final ReentrantLock lock; + private final Deque free; + private final Deque waiters; + private long availableMemory; + private final Metrics metrics; + private final Time time; + private final Sensor waitTime; + + /** + * Create a new buffer pool + * + * @param memory The maximum amount of memory that this buffer pool can allocate + * @param poolableSize The buffer size to cache in the free list rather than deallocating + * @param blockOnExhaustion This controls the behavior when the buffer pool is out of memory. If true the + * {@link #allocate(int)} call will block and wait for memory to be returned to the pool. If false + * {@link #allocate(int)} will throw an exception if the buffer is out of memory. + * @param metrics instance of Metrics + * @param time time instance + * @param metricGrpName logical group name for metrics + * @param metricTags additional key/val attributes for metrics + */ + public ByteBufferPool(long memory, int poolableSize, boolean blockOnExhaustion, Metrics metrics, Time time , String metricGrpName , Map metricTags) { + this.poolableSize = poolableSize; + this.blockOnExhaustion = blockOnExhaustion; + this.lock = new ReentrantLock(); + this.free = new ArrayDeque(); + this.waiters = new ArrayDeque(); + this.totalMemory = memory; + this.availableMemory = memory; + this.metrics = metrics; + this.time = time; + this.waitTime = this.metrics.sensor("bufferpool-wait-time"); + MetricName metricName = new MetricName("bufferpool-wait-ratio", + metricGrpName, + "The fraction of time an appender waits for space allocation.", + metricTags); + this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); + } + + @Override + public ByteBuffer allocate(int size) throws InterruptedException { + if (size > this.totalMemory) + throw new IllegalArgumentException("Attempt to allocate " + size + + " bytes, but there is a hard limit of " + + this.totalMemory + + " on memory allocations."); + + this.lock.lock(); + try { + // check if we have a free buffer of the right size pooled + if (size == poolableSize && !this.free.isEmpty()) + return this.free.pollFirst(); + + // now check if the request is immediately satisfiable with the + // memory on hand or if we need to block + int freeListSize = this.free.size() * this.poolableSize; + if (this.availableMemory + freeListSize >= size) { + // we have enough unallocated or pooled memory to immediately + // satisfy the request + freeUp(size); + this.availableMemory -= size; + lock.unlock(); + return ByteBuffer.allocate(size); + } else if (!blockOnExhaustion) { + throw new BufferExhaustedException("You have exhausted the " + this.totalMemory + + " bytes of memory you configured for the client and the client is configured to error" + + " rather than block when memory is exhausted."); + } else { + // we are out of memory and will have to block + int accumulated = 0; + ByteBuffer buffer = null; + Condition moreMemory = this.lock.newCondition(); + this.waiters.addLast(moreMemory); + // loop over and over until we have a buffer or have reserved + // enough memory to allocate one + while (accumulated < size) { + long startWait = time.nanoseconds(); + moreMemory.await(); + long endWait = time.nanoseconds(); + this.waitTime.record(endWait - startWait, time.milliseconds()); + + // check if we can satisfy this request from the free list, + // otherwise allocate memory + if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { + // just grab a buffer from the free list + buffer = this.free.pollFirst(); + accumulated = size; + } else { + // we'll need to allocate memory, but we may only get + // part of what we need on this iteration + freeUp(size - accumulated); + int got = (int) Math.min(size - accumulated, this.availableMemory); + this.availableMemory -= got; + accumulated += got; + } + } + + // remove the condition for this thread to let the next thread + // in line start getting memory + Condition removed = this.waiters.removeFirst(); + if (removed != moreMemory) + throw new IllegalStateException("Wrong condition: this shouldn't happen."); + + // signal any additional waiters if there is more memory left + // over for them + if (this.availableMemory > 0 || !this.free.isEmpty()) { + if (!this.waiters.isEmpty()) + this.waiters.peekFirst().signal(); + } + + // unlock and return the buffer + lock.unlock(); + if (buffer == null) + return ByteBuffer.allocate(size); + else + return buffer; + } + } finally { + if (lock.isHeldByCurrentThread()) + lock.unlock(); + } + } + + /** + * Attempt to ensure we have at least the requested number of bytes of memory for allocation by deallocating pooled + * buffers (if needed) + */ + private void freeUp(int size) { + while (!this.free.isEmpty() && this.availableMemory < size) + this.availableMemory += this.free.pollLast().capacity(); + } + + @Override + public void deallocate(ByteBuffer buffer, int size) { + lock.lock(); + try { + if (size == this.poolableSize && size == buffer.capacity()) { + buffer.clear(); + this.free.add(buffer); + } else { + this.availableMemory += size; + } + Condition moreMem = this.waiters.peekFirst(); + if (moreMem != null) + moreMem.signal(); + } finally { + lock.unlock(); + } + } + + @Override + public void deallocate(ByteBuffer buffer) { + deallocate(buffer, buffer.capacity()); + } + + @Override + public long availableMemory() { + lock.lock(); + try { + return this.availableMemory + this.free.size() * this.poolableSize; + } finally { + lock.unlock(); + } + } + + @Override + public long unallocatedMemory() { + lock.lock(); + try { + return this.availableMemory; + } finally { + lock.unlock(); + } + } + + @Override + public int queued() { + lock.lock(); + try { + return this.waiters.size(); + } finally { + lock.unlock(); + } + } + + @Override + public int poolableSize() { + return this.poolableSize; + } + + @Override + public long totalMemory() { + return this.totalMemory; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/MmapBufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/MmapBufferPool.java new file mode 100644 index 0000000..1f4e395 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/MmapBufferPool.java @@ -0,0 +1,98 @@ +package org.apache.kafka.clients.producer.internals; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel.MapMode; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; + +import org.apache.kafka.clients.producer.BufferExhaustedException; + +public class MmapBufferPool implements BufferPool { + + private final long maxSize; + private final int chunkSize; + private final boolean blockOnExhaustion; + private final BlockingDeque free; + + public MmapBufferPool(File backingFileName, long maxSize, int chunkSize, boolean blockOnExhaustion) throws IOException { + this.maxSize = maxSize; + this.chunkSize = chunkSize; + this.blockOnExhaustion = blockOnExhaustion; + this.free = new LinkedBlockingDeque(); + RandomAccessFile f = new RandomAccessFile(backingFileName, "rw"); + f.setLength(maxSize); + MappedByteBuffer buffer = f.getChannel().map(MapMode.READ_WRITE, 0, maxSize); + while (buffer.remaining() >= chunkSize) { + ByteBuffer b = buffer.slice(); + b.limit(chunkSize); + buffer.position(buffer.position() + chunkSize); + free.add(b); + } + f.close(); + } + + @Override + public ByteBuffer allocate(int size) throws InterruptedException { + if (size > chunkSize) + throw new IllegalArgumentException("Illegal allocation size."); + ByteBuffer buffer; + if (blockOnExhaustion) { + buffer = this.free.take(); + } else { + buffer = this.free.poll(); + if(buffer == null) + throw new BufferExhaustedException("You have exhausted the " + this.maxSize + + " bytes of memory you configured for the client and the client is configured to error" + + " rather than block when memory is exhausted."); + } + + return buffer; + } + + @Override + public void deallocate(ByteBuffer buffer, int size) { + // TODO: check size + this.free.add(buffer); + } + + @Override + public void deallocate(ByteBuffer buffer) { + // TODO: check size + this.free.add(buffer); + } + + @Override + public long availableMemory() { + // TODO write me + return 0; + } + + @Override + public long unallocatedMemory() { + // TODO write me + return 0; + } + + @Override + public int queued() { + // TODO write me + return 0; + } + + @Override + public int poolableSize() { + // TODO write me + return 0; + } + + @Override + public long totalMemory() { + // TODO write me + return 0; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ecfe214..ee1c43e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -12,6 +12,8 @@ */ package org.apache.kafka.clients.producer.internals; +import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; @@ -26,6 +28,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -83,6 +86,7 @@ public final class RecordAccumulator { long totalSize, long lingerMs, long retryBackoffMs, + File backingFile, boolean blockOnBufferFull, Metrics metrics, Time time, @@ -94,7 +98,15 @@ public final class RecordAccumulator { this.retryBackoffMs = retryBackoffMs; this.batches = new CopyOnWriteMap>(); String metricGrpName = "producer-metrics"; - this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags); + if(backingFile == null) { + this.free = new ByteBufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags); + } else { + try { + this.free = new MmapBufferPool(backingFile, totalSize, batchSize, blockOnBufferFull); + } catch(IOException e) { + throw new KafkaException("Failed to create mmap-based buffer.", e); + } + } this.time = time; registerMetrics(metrics, metricGrpName, metricTags); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 083e7a3..6310cb7 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -36,7 +36,7 @@ public class MemoryRecords implements Records { // Construct a writable memory records private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int sizeLimit) { this.writable = writable; - this.capacity = buffer.capacity(); + this.capacity = buffer.limit(); this.sizeLimit = sizeLimit; if (this.writable) { this.buffer = null; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java index 4ae43ed..9c21fb7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.producer; +import org.apache.kafka.clients.producer.internals.ByteBufferPool; import org.apache.kafka.clients.producer.internals.BufferPool; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; @@ -45,7 +46,7 @@ public class BufferPoolTest { public void testSimple() throws Exception { long totalMemory = 64 * 1024; int size = 1024; - BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags); + BufferPool pool = new ByteBufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags); ByteBuffer buffer = pool.allocate(size); assertEquals("Buffer size should equal requested size.", size, buffer.limit()); assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory()); @@ -72,7 +73,7 @@ public class BufferPoolTest { */ @Test(expected = IllegalArgumentException.class) public void testCantAllocateMoreMemoryThanWeHave() throws Exception { - BufferPool pool = new BufferPool(1024, 512, true, metrics, time, metricGroup, metricTags); + BufferPool pool = new ByteBufferPool(1024, 512, true, metrics, time, metricGroup, metricTags); ByteBuffer buffer = pool.allocate(1024); assertEquals(1024, buffer.limit()); pool.deallocate(buffer); @@ -81,7 +82,7 @@ public class BufferPoolTest { @Test public void testNonblockingMode() throws Exception { - BufferPool pool = new BufferPool(2, 1, false, metrics, time, metricGroup, metricTags); + BufferPool pool = new ByteBufferPool(2, 1, false, metrics, time, metricGroup, metricTags); pool.allocate(1); try { pool.allocate(2); @@ -96,7 +97,7 @@ public class BufferPoolTest { */ @Test public void testDelayedAllocation() throws Exception { - BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags); + BufferPool pool = new ByteBufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags); ByteBuffer buffer = pool.allocate(1024); CountDownLatch doDealloc = asyncDeallocate(pool, buffer); CountDownLatch allocation = asyncAllocate(pool, 5 * 1024); @@ -147,7 +148,7 @@ public class BufferPoolTest { final int iterations = 50000; final int poolableSize = 1024; final int totalMemory = numThreads / 2 * poolableSize; - final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags); + final BufferPool pool = new ByteBufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) threads.add(new StressTestThread(pool, iterations)); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index 8333863..d12a16d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -67,7 +67,7 @@ public class RecordAccumulatorTest { @Test public void testFull() throws Exception { long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, null, false, metrics, time, metricTags); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { accum.append(tp1, key, value, CompressionType.NONE, null); @@ -90,7 +90,7 @@ public class RecordAccumulatorTest { @Test public void testAppendLarge() throws Exception { int batchSize = 512; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, null, false, metrics, time, metricTags); accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @@ -98,7 +98,7 @@ public class RecordAccumulatorTest { @Test public void testLinger() throws Exception { long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, null, false, metrics, time, metricTags); accum.append(tp1, key, value, CompressionType.NONE, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); @@ -115,7 +115,7 @@ public class RecordAccumulatorTest { @Test public void testPartialDrain() throws Exception { - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, null, false, metrics, time, metricTags); int appends = 1024 / msgSize + 1; List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { @@ -134,7 +134,7 @@ public class RecordAccumulatorTest { final int numThreads = 5; final int msgs = 10000; final int numParts = 2; - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, null, true, metrics, time, metricTags); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { @@ -174,7 +174,7 @@ public class RecordAccumulatorTest { public void testNextReadyCheckDelay() throws Exception { // Next check time will use lingerMs since this test won't trigger any retries/backoff long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, null, false, metrics, time, metricTags); // Just short of going over the limit so we trigger linger time int appends = 1024 / msgSize; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index 558942a..4017414 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -53,7 +53,7 @@ public class SenderTest { private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = new Metrics(time); Map metricTags = new LinkedHashMap(); - private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time, metricTags); + private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, null, false, metrics, time, metricTags); private Sender sender = new Sender(client, metadata, this.accumulator, -- 1.9.3 (Apple Git-50)