diff --git a/build.gradle b/build.gradle index 5432c0c..d6fd287 100644 --- a/build.gradle +++ b/build.gradle @@ -151,6 +151,7 @@ project(':core') { compile 'com.101tec:zkclient:0.3' compile 'com.yammer.metrics:metrics-core:2.2.0' compile 'net.sf.jopt-simple:jopt-simple:3.2' + compile 'org.xerial.snappy:snappy-java:1.0.5' testCompile 'junit:junit:4.1' testCompile 'org.easymock:easymock:3.0' @@ -316,8 +317,6 @@ project(':clients') { dependencies { compile "org.slf4j:slf4j-api:1.7.6" - compile 'org.xerial.snappy:snappy-java:1.0.5' - testCompile 'com.novocode:junit-interface:0.9' testRuntime "$slf4jlog4j" } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 1ff9174..1ac6943 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -69,7 +69,6 @@ public class KafkaProducer implements Producer { private final Sender sender; private final Metrics metrics; private final Thread ioThread; - private final CompressionType compressionType; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -100,7 +99,6 @@ public class KafkaProducer implements Producer { config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG); - this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG), this.totalMemorySize, config.getLong(ProducerConfig.LINGER_MS_CONFIG), @@ -226,7 +224,7 @@ public class KafkaProducer implements Producer { ensureValidSize(record.key(), record.value()); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), compressionType, callback); + FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback); this.sender.wakeup(); return future; // For API exceptions return them in the future; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 48706ba..32e12ad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -136,11 +136,6 @@ public class ProducerConfig extends AbstractConfig { public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; /** - * The compression type for all data generated. The default is none (i.e. no compression) - */ - public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; - - /** * Should we register the Kafka metrics as JMX mbeans? */ public static final String ENABLE_JMX_CONFIG = "enable.jmx"; @@ -163,10 +158,9 @@ public class ProducerConfig extends AbstractConfig { .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah") .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah") .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah") + .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "") .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "") - .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah") - .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", "blah blah") - .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, ""); + .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah"); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index d1d6c4b..b69866a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -71,7 +71,7 @@ public final class BufferPool { * @param size The buffer size to allocate in bytes * @return The buffer * @throws InterruptedException If the thread is interrupted while blocked - * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block + * @throws IllegalArgument if size is larger than the total memory controlled by the pool (and hence we would block * forever) * @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool */ @@ -167,31 +167,28 @@ public final class BufferPool { * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the * memory as free. * - * @param buffer The buffer to return - * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity - * since the buffer may re-allocate itself during in-place compression + * @param buffers The buffers to return */ - public void deallocate(ByteBuffer buffer, int size) { + public void deallocate(ByteBuffer... buffers) { lock.lock(); try { - if (size == this.poolableSize && size == buffer.capacity()) { - buffer.clear(); - this.free.add(buffer); - } else { - this.availableMemory += size; + for (int i = 0; i < buffers.length; i++) { + int size = buffers[i].capacity(); + if (size == this.poolableSize) { + buffers[i].clear(); + this.free.add(buffers[i]); + } else { + this.availableMemory += size; + } + Condition moreMem = this.waiters.peekFirst(); + if (moreMem != null) + moreMem.signal(); } - Condition moreMem = this.waiters.peekFirst(); - if (moreMem != null) - moreMem.signal(); } finally { lock.unlock(); } } - public void deallocate(ByteBuffer buffer) { - deallocate(buffer, buffer.capacity()); - } - /** * the total free memory both unallocated and in the free list */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 50bf95f..673b296 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -28,8 +28,8 @@ import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -91,26 +91,26 @@ public final class RecordAccumulator { private void registerMetrics(Metrics metrics) { metrics.addMetric("blocked_threads", - "The number of user threads blocked waiting for buffer memory to enqueue their records", - new Measurable() { + "The number of user threads blocked waiting for buffer memory to enqueue their records", + new Measurable() { public double measure(MetricConfig config, long now) { return free.queued(); } }); metrics.addMetric("buffer_total_bytes", - "The total amount of buffer memory that is available (not currently used for buffering records).", - new Measurable() { + "The total amount of buffer memory that is available (not currently used for buffering records).", + new Measurable() { public double measure(MetricConfig config, long now) { return free.totalMemory(); } }); metrics.addMetric("buffer_available_bytes", - "The total amount of buffer memory that is available (not currently used for buffering records).", - new Measurable() { - public double measure(MetricConfig config, long now) { - return free.availableMemory(); - } - }); + "The total amount of buffer memory that is available (not currently used for buffering records).", + new Measurable() { + public double measure(MetricConfig config, long now) { + return free.availableMemory(); + } + }); } /** @@ -132,7 +132,7 @@ public final class RecordAccumulator { synchronized (dq) { RecordBatch batch = dq.peekLast(); if (batch != null) { - FutureRecordMetadata future = batch.tryAppend(key, value, callback); + FutureRecordMetadata future = batch.tryAppend(key, value, compression, callback); if (future != null) return future; } @@ -145,7 +145,7 @@ public final class RecordAccumulator { synchronized (dq) { RecordBatch last = dq.peekLast(); if (last != null) { - FutureRecordMetadata future = last.tryAppend(key, value, callback); + FutureRecordMetadata future = last.tryAppend(key, value, compression, callback); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen // often... @@ -153,10 +153,8 @@ public final class RecordAccumulator { return future; } } - MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression); - RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); - FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback)); - + RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds()); + FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, compression, callback)); dq.addLast(batch); return future; } @@ -195,7 +193,7 @@ public final class RecordAccumulator { RecordBatch batch = deque.peekFirst(); if (batch != null) { boolean backingOff = batch.attempts > 0 && batch.lastAttempt + retryBackoffMs > now; - boolean full = deque.size() > 1 || batch.records.isFull(); + boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining(); boolean expired = now - batch.created >= lingerMs; boolean sendable = full || expired || exhausted || closed; if (sendable && !backingOff) @@ -241,15 +239,10 @@ public final class RecordAccumulator { Deque deque = dequeFor(tp); if (deque != null) { synchronized (deque) { - RecordBatch first = deque.peekFirst(); - if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { - // there is a rare case that a single batch size is larger than the request size due - // to compression; in this case we will still eventually send this batch in a single - // request + if (size + deque.peekFirst().records.sizeInBytes() > maxSize) { return ready; } else { RecordBatch batch = deque.pollFirst(); - batch.records.close(); size += batch.records.sizeInBytes(); ready.add(batch); } @@ -276,7 +269,7 @@ public final class RecordAccumulator { * Deallocate the record batch */ public void deallocate(RecordBatch batch) { - free.deallocate(batch.records.buffer(), batch.records.capacity()); + free.deallocate(batch.records.buffer()); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 35f1d7a..038a05a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -17,6 +17,7 @@ import java.util.List; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,11 +54,11 @@ public final class RecordBatch { * * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ - public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) { + public FutureRecordMetadata tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) { if (!this.records.hasRoomFor(key, value)) { return null; } else { - this.records.append(0L, key, value); + this.records.append(0L, key, value, compression); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) thunks.add(new Thunk(callback, future)); @@ -70,7 +71,7 @@ public final class RecordBatch { * Complete the request * * @param baseOffset The base offset of the messages assigned by the server - * @param exception The exception returned or null if no exception + * @param errorCode The error code or 0 if no error */ public void done(long baseOffset, RuntimeException exception) { this.produceFuture.done(topicPartition, baseOffset, exception); diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index 05085e0..3ebbb80 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -29,8 +29,8 @@ import org.apache.kafka.common.record.Records; public class ProducerPerformance { public static void main(String[] args) throws Exception { - if (args.length < 5) { - System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks [compression_type]"); + if (args.length != 5) { + System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks"); System.exit(1); } String url = args[0]; @@ -45,8 +45,6 @@ public class ProducerPerformance { props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE)); props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(256 * 1024 * 1024)); props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(256 * 1024)); - if (args.length == 6) - props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]); KafkaProducer producer = new KafkaProducer(props); Callback callback = new Callback() { diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java deleted file mode 100644 index 12651d4..0000000 --- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.record; - -import java.io.InputStream; -import java.nio.ByteBuffer; - -/** - * A byte buffer backed input outputStream - */ -public class ByteBufferInputStream extends InputStream { - - private ByteBuffer buffer; - - public ByteBufferInputStream(ByteBuffer buffer) { - this.buffer = buffer; - } - - public int read() { - if (!buffer.hasRemaining()) { - return -1; - } - return buffer.get() & 0xFF; - } - - public int read(byte[] bytes, int off, int len) { - if (!buffer.hasRemaining()) { - return -1; - } - - len = Math.min(len, buffer.remaining()); - buffer.get(bytes, off, len); - return len; - } -} diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java deleted file mode 100644 index c7bd2f8..0000000 --- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.record; - -import java.io.OutputStream; -import java.nio.ByteBuffer; - -/** - * A byte buffer backed output outputStream - */ -public class ByteBufferOutputStream extends OutputStream { - - private static float REALLOCATION_FACTOR = 1.1f; - - private ByteBuffer buffer; - - public ByteBufferOutputStream(ByteBuffer buffer) { - this.buffer = buffer; - } - - public void write(int b) { - if (buffer.remaining() < 1) - expandBuffer(buffer.capacity() + 1); - buffer.put((byte) b); - } - - public void write(byte[] bytes, int off, int len) { - if (buffer.remaining() < len) - expandBuffer(buffer.capacity() + len); - buffer.put(bytes, off, len); - } - - public ByteBuffer buffer() { - return buffer; - } - - private void expandBuffer(int size) { - int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size); - ByteBuffer temp = ByteBuffer.allocate(expandSize); - temp.put(buffer.array(), buffer.arrayOffset(), buffer.position()); - buffer = temp; - } -} diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index c557e44..906da02 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -20,16 +20,14 @@ package org.apache.kafka.common.record; * The compression type to use */ public enum CompressionType { - NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f); + NONE(0, "none"), GZIP(1, "gzip"), SNAPPY(2, "snappy"); public final int id; public final String name; - public final float rate; - private CompressionType(int id, String name, float rate) { + private CompressionType(int id, String name) { this.id = id; this.name = name; - this.rate = rate; } public static CompressionType forId(int id) { @@ -55,5 +53,4 @@ public enum CompressionType { else throw new IllegalArgumentException("Unknown compression name: " + name); } - } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java deleted file mode 100644 index 6ae3d06..0000000 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ /dev/null @@ -1,244 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.record; - -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.utils.Utils; - -import java.io.InputStream; -import java.io.OutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - -public class Compressor { - - static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f; - static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f; - static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024; - - private static float[] typeToRate; - private static int MAX_TYPE_ID = -1; - - static { - for (CompressionType type : CompressionType.values()) { - MAX_TYPE_ID = Math.max(MAX_TYPE_ID, type.id); - } - typeToRate = new float[MAX_TYPE_ID+1]; - for (CompressionType type : CompressionType.values()) { - typeToRate[type.id] = type.rate; - } - } - - private final CompressionType type; - private final DataOutputStream appendStream; - private final ByteBufferOutputStream bufferStream; - private final int initPos; - - public long writtenUncompressed; - public long numRecords; - - public Compressor(ByteBuffer buffer, CompressionType type, int blockSize) { - this.type = type; - this.initPos = buffer.position(); - - this.numRecords = 0; - this.writtenUncompressed = 0; - - if (type != CompressionType.NONE) { - // for compressed records, leave space for the header and the shallow message metadata - // and move the starting position to the value payload offset - buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD); - } - - // create the stream - bufferStream = new ByteBufferOutputStream(buffer); - appendStream = wrapForOutput(bufferStream, type, blockSize); - } - - public Compressor(ByteBuffer buffer, CompressionType type) { - this(buffer, type, COMPRESSION_DEFAULT_BUFFER_SIZE); - } - - public ByteBuffer buffer() { - return bufferStream.buffer(); - } - - public void close() { - try { - appendStream.close(); - } catch (IOException e) { - throw new KafkaException(e); - } - - if (type != CompressionType.NONE) { - ByteBuffer buffer = bufferStream.buffer(); - int pos = buffer.position(); - // write the header, for the end offset write as number of records - 1 - buffer.position(initPos); - buffer.putLong(numRecords - 1); - buffer.putInt(pos - initPos - Records.LOG_OVERHEAD); - // write the shallow message (the crc and value size are not correct yet) - Record.write(buffer, null, null, type, 0, -1); - // compute the fill the value size - int valueSize = pos - initPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD; - buffer.putInt(initPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET, valueSize); - // compute and fill the crc at the beginning of the message - long crc = Record.computeChecksum(buffer, - initPos + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET, - pos - initPos - Records.LOG_OVERHEAD - Record.MAGIC_OFFSET); - Utils.writeUnsignedInt(buffer, initPos + Records.LOG_OVERHEAD + Record.CRC_OFFSET, crc); - // reset the position - buffer.position(pos); - - // update the compression ratio - float compressionRate = (float) buffer.position() / this.writtenUncompressed; - typeToRate[type.id] = typeToRate[type.id] * COMPRESSION_RATE_DAMPING_FACTOR + - compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR); - } - } - - // Note that for all the write operations below, IO exceptions should - // never be thrown since the underlying ByteBufferOutputStream does not throw IOException; - // therefore upon encountering this issue we just close the append stream. - - public void putLong(final long value) { - try { - appendStream.writeLong(value); - } catch (IOException e) { - throw new KafkaException("I/O exception when writing to the append stream, closing", e); - } - } - - public void putInt(final int value) { - try { - appendStream.writeInt(value); - } catch (IOException e) { - throw new KafkaException("I/O exception when writing to the append stream, closing", e); - } - } - - public void put(final ByteBuffer buffer) { - try { - appendStream.write(buffer.array(), buffer.arrayOffset(), buffer.limit()); - } catch (IOException e) { - throw new KafkaException("I/O exception when writing to the append stream, closing", e); - } - } - - public void putByte(final byte value) { - try { - appendStream.write(value); - } catch (IOException e) { - throw new KafkaException("I/O exception when writing to the append stream, closing", e); - } - } - - public void put(final byte[] bytes, final int offset, final int len) { - try { - appendStream.write(bytes, offset, len); - } catch (IOException e) { - throw new KafkaException("I/O exception when writing to the append stream, closing", e); - } - } - - public void putRecord(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { - // put a record as un-compressed into the underlying stream - long crc = Record.computeChecksum(key, value, type, valueOffset, valueSize); - byte attributes = Record.computeAttributes(type); - putRecord(crc, attributes, key, value, valueOffset, valueSize); - } - - public void putRecord(byte[] key, byte[] value) { - putRecord(key, value, CompressionType.NONE, 0, -1); - } - - private void putRecord(final long crc, final byte attributes, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) { - Record.write(this, crc, attributes, key, value, valueOffset, valueSize); - } - - public void recordWritten(int size) { - numRecords += 1; - writtenUncompressed += size; - } - - public long estimatedBytesWritten() { - if (type == CompressionType.NONE) { - return bufferStream.buffer().position(); - } else { - // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes - return (long) (writtenUncompressed * typeToRate[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR); - } - } - - // the following two functions also need to be public since they are used in MemoryRecords.iteration - - static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) { - try { - switch (type) { - case NONE: - return new DataOutputStream(buffer); - case GZIP: - return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); - case SNAPPY: - // dynamically load the snappy class to avoid runtime dependency - // on snappy if we are not using it - try { - Class SnappyOutputStream = Class.forName("org.xerial.snappy.SnappyOutputStream"); - OutputStream stream = (OutputStream) SnappyOutputStream.getConstructor(OutputStream.class, Integer.TYPE) - .newInstance(buffer, bufferSize); - return new DataOutputStream(stream); - } catch (Exception e) { - throw new KafkaException(e); - } - default: - throw new IllegalArgumentException("Unknown compression type: " + type); - } - } catch (IOException e) { - throw new KafkaException(e); - } - } - - static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type) { - try { - switch (type) { - case NONE: - return new DataInputStream(buffer); - case GZIP: - return new DataInputStream(new GZIPInputStream(buffer)); - case SNAPPY: - // dynamically load the snappy class to avoid runtime dependency - // on snappy if we are not using it - try { - Class SnappyInputStream = Class.forName("org.xerial.snappy.SnappyInputStream"); - InputStream stream = (InputStream) SnappyInputStream.getConstructor(InputStream.class) - .newInstance(buffer); - return new DataInputStream(stream); - } catch (Exception e) { - throw new KafkaException(e); - } - default: - throw new IllegalArgumentException("Unknown compression type: " + type); - } - } catch (IOException e) { - throw new KafkaException(e); - } - } -} diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 428968c..9d8935f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -16,99 +16,53 @@ */ package org.apache.kafka.common.record; -import java.io.DataInputStream; -import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; import java.util.Iterator; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.AbstractIterator; + /** * A {@link Records} implementation backed by a ByteBuffer. */ public class MemoryRecords implements Records { - private final Compressor compressor; - private final int capacity; - private ByteBuffer buffer; - private boolean writable; - - // Construct a writable memory records - private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable) { - this.writable = writable; - this.capacity = buffer.capacity(); - if (this.writable) { - this.buffer = null; - this.compressor = new Compressor(buffer, type); - } else { - this.buffer = buffer; - this.compressor = null; - } - } + private final ByteBuffer buffer; - public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) { - return new MemoryRecords(buffer, type, true); + public MemoryRecords(int size) { + this(ByteBuffer.allocate(size)); } - public static MemoryRecords iterableRecords(ByteBuffer buffer) { - return new MemoryRecords(buffer, CompressionType.NONE, false); + public MemoryRecords(ByteBuffer buffer) { + this.buffer = buffer; } /** * Append the given record and offset to the buffer */ public void append(long offset, Record record) { - if (!writable) - throw new IllegalStateException("Memory records is not writable"); - - int size = record.size(); - compressor.putLong(offset); - compressor.putInt(size); - compressor.put(record.buffer()); - compressor.recordWritten(size + Records.LOG_OVERHEAD); + buffer.putLong(offset); + buffer.putInt(record.size()); + buffer.put(record.buffer()); record.buffer().rewind(); } /** * Append a new record and offset to the buffer */ - public void append(long offset, byte[] key, byte[] value) { - if (!writable) - throw new IllegalStateException("Memory records is not writable"); - - int size = Record.recordSize(key, value); - compressor.putLong(offset); - compressor.putInt(size); - compressor.putRecord(key, value); - compressor.recordWritten(size + Records.LOG_OVERHEAD); + public void append(long offset, byte[] key, byte[] value, CompressionType type) { + buffer.putLong(offset); + buffer.putInt(Record.recordSize(key, value)); + Record.write(this.buffer, key, value, type); } /** * Check if we have room for a new record containing the given key/value pair - * - * Note that the return value is based on the estimate of the bytes written to the compressor, - * which may not be accurate if compression is really used. When this happens, the following - * append may cause dynamic buffer re-allocation in the underlying byte buffer stream. */ public boolean hasRoomFor(byte[] key, byte[] value) { - return this.writable && - this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value); - } - - public boolean isFull() { - return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten(); - } - - /** - * Close this batch for no more appends - */ - public void close() { - compressor.close(); - writable = false; - buffer = compressor.buffer(); + return this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value); } /** Write the records in this set to the given channel */ @@ -120,14 +74,7 @@ public class MemoryRecords implements Records { * The size of this record set */ public int sizeInBytes() { - return compressor.buffer().position(); - } - - /** - * Return the capacity of the buffer - */ - public int capacity() { - return this.capacity; + return this.buffer.position(); } /** @@ -139,79 +86,34 @@ public class MemoryRecords implements Records { @Override public Iterator iterator() { - ByteBuffer copy = (ByteBuffer) this.buffer.duplicate().flip(); - return new RecordsIterator(copy, CompressionType.NONE, false); + return new RecordsIterator(this.buffer); } + /* TODO: allow reuse of the buffer used for iteration */ public static class RecordsIterator extends AbstractIterator { private final ByteBuffer buffer; - private final DataInputStream stream; - private final CompressionType type; - private final boolean shallow; - private RecordsIterator innerIter; - - public RecordsIterator(ByteBuffer buffer, CompressionType type, boolean shallow) { - this.type = type; - this.buffer = buffer; - this.shallow = shallow; - stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type); + + public RecordsIterator(ByteBuffer buffer) { + ByteBuffer copy = buffer.duplicate(); + copy.flip(); + this.buffer = copy; } - /* - * Read the next record from the buffer. - * - * Note that in the compressed message set, each message value size is set as the size - * of the un-compressed version of the message value, so when we do de-compression - * allocating an array of the specified size for reading compressed value data is sufficient. - */ @Override protected LogEntry makeNext() { - if (innerDone()) { - try { - // read the offset - long offset = stream.readLong(); - // read record size - int size = stream.readInt(); - if (size < 0) - throw new IllegalStateException("Record with size " + size); - // read the record, if compression is used we cannot depend on size - // and hence has to do extra copy - ByteBuffer rec; - if (type == CompressionType.NONE) { - rec = buffer.slice(); - buffer.position(buffer.position() + size); - rec.limit(size); - } else { - byte[] recordBuffer = new byte[size]; - stream.read(recordBuffer, 0, size); - rec = ByteBuffer.wrap(recordBuffer); - } - LogEntry entry = new LogEntry(offset, new Record(rec)); - entry.record().ensureValid(); - - // decide whether to go shallow or deep iteration if it is compressed - CompressionType compression = entry.record().compressionType(); - if (compression == CompressionType.NONE || shallow) { - return entry; - } else { - // init the inner iterator with the value payload of the message, - // which will de-compress the payload to a set of messages - ByteBuffer value = entry.record().value(); - innerIter = new RecordsIterator(value, compression, true); - return innerIter.next(); - } - } catch (EOFException e) { - return allDone(); - } catch (IOException e) { - throw new KafkaException(e); - } - } else { - return innerIter.next(); - } - } - - private boolean innerDone() { - return (innerIter == null || !innerIter.hasNext()); + if (buffer.remaining() < Records.LOG_OVERHEAD) + return allDone(); + long offset = buffer.getLong(); + int size = buffer.getInt(); + if (size < 0) + throw new IllegalStateException("Record with size " + size); + if (buffer.remaining() < size) + return allDone(); + ByteBuffer rec = buffer.slice(); + rec.limit(size); + this.buffer.position(this.buffer.position() + size); + return new LogEntry(offset, new Record(rec)); } } + } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index ce1177e..f1dc977 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -18,7 +18,6 @@ package org.apache.kafka.common.record; import java.nio.ByteBuffer; -import org.apache.kafka.common.utils.Crc32; import org.apache.kafka.common.utils.Utils; @@ -41,15 +40,13 @@ public final class Record { public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH; public static final int VALUE_SIZE_LENGTH = 4; - /** - * The size for the record header - */ - public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH; + /** The amount of overhead bytes in a record */ + public static final int RECORD_OVERHEAD = KEY_OFFSET + VALUE_SIZE_LENGTH; /** - * The amount of overhead bytes in a record + * The minimum valid size for the record header */ - public static final int RECORD_OVERHEAD = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; + public static final int MIN_HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; /** * The current "magic" value @@ -74,29 +71,27 @@ public final class Record { } /** - * A constructor to create a LogRecord. If the record's compression type is not none, then - * its value payload should be already compressed with the specified type; the constructor - * would always write the value payload as is and will not do the compression itself. + * A constructor to create a LogRecord * * @param key The key of the record (null, if none) * @param value The record value - * @param type The compression type used on the contents of the record (if any) + * @param codec The compression codec used on the contents of the record (if any) * @param valueOffset The offset into the payload array used to extract payload * @param valueSize The size of the payload to use */ - public Record(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { - this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, - value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset))); - write(this.buffer, key, value, type, valueOffset, valueSize); + public Record(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { + this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, value == null ? 0 : valueSize >= 0 ? valueSize + : value.length - valueOffset))); + write(this.buffer, key, value, codec, valueOffset, valueSize); this.buffer.rewind(); } - public Record(byte[] key, byte[] value, CompressionType type) { - this(key, value, type, 0, -1); + public Record(byte[] key, byte[] value, CompressionType codec) { + this(key, value, codec, 0, -1); } - public Record(byte[] value, CompressionType type) { - this(null, value, type); + public Record(byte[] value, CompressionType codec) { + this(null, value, codec); } public Record(byte[] key, byte[] value) { @@ -107,37 +102,40 @@ public final class Record { this(null, value, CompressionType.NONE); } - // Write a record to the buffer, if the record's compression type is none, then - // its value payload should be already compressed with the specified type - public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { - // construct the compressor with compression type none since this function will not do any - //compression according to the input type, it will just write the record's payload as is - Compressor compressor = new Compressor(buffer, CompressionType.NONE, buffer.capacity()); - compressor.putRecord(key, value, type, valueOffset, valueSize); - } - - public static void write(Compressor compressor, long crc, byte attributes, byte[] key, byte[] value, int valueOffset, int valueSize) { - // write crc - compressor.putInt((int) (crc & 0xffffffffL)); - // write magic value - compressor.putByte(CURRENT_MAGIC_VALUE); - // write attributes - compressor.putByte(attributes); + public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { + // skip crc, we will fill that in at the end + int pos = buffer.position(); + buffer.position(pos + MAGIC_OFFSET); + buffer.put(CURRENT_MAGIC_VALUE); + byte attributes = 0; + if (codec.id > 0) + attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id)); + buffer.put(attributes); // write the key if (key == null) { - compressor.putInt(-1); + buffer.putInt(-1); } else { - compressor.putInt(key.length); - compressor.put(key, 0, key.length); + buffer.putInt(key.length); + buffer.put(key, 0, key.length); } // write the value if (value == null) { - compressor.putInt(-1); + buffer.putInt(-1); } else { int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); - compressor.putInt(size); - compressor.put(value, valueOffset, size); + buffer.putInt(size); + buffer.put(value, valueOffset, size); } + + // now compute the checksum and fill it in + long crc = computeChecksum(buffer, + buffer.arrayOffset() + pos + MAGIC_OFFSET, + buffer.position() - pos - MAGIC_OFFSET - buffer.arrayOffset()); + Utils.writeUnsignedInt(buffer, pos + CRC_OFFSET, crc); + } + + public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec) { + write(buffer, key, value, codec, 0, -1); } public static int recordSize(byte[] key, byte[] value) { @@ -152,51 +150,13 @@ public final class Record { return this.buffer; } - public static byte computeAttributes(CompressionType type) { - byte attributes = 0; - if (type.id > 0) - attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id)); - return attributes; - } - /** * Compute the checksum of the record from the record contents */ public static long computeChecksum(ByteBuffer buffer, int position, int size) { - Crc32 crc = new Crc32(); - crc.update(buffer.array(), buffer.arrayOffset() + position, size); - return crc.getValue(); - } - - /** - * Compute the checksum of the record from the attributes, key and value payloads - */ - public static long computeChecksum(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { - Crc32 crc = new Crc32(); - crc.update(CURRENT_MAGIC_VALUE); - byte attributes = 0; - if (type.id > 0) - attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id)); - crc.update(attributes); - // update for the key - if (key == null) { - crc.updateInt(-1); - } else { - crc.updateInt(key.length); - crc.update(key, 0, key.length); - } - // update for the value - if (value == null) { - crc.updateInt(-1); - } else { - int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); - crc.updateInt(size); - crc.update(value, valueOffset, size); - } - return crc.getValue(); + return Utils.crc32(buffer.array(), buffer.arrayOffset() + position, size - buffer.arrayOffset()); } - /** * Compute the checksum of the record from the record contents */ @@ -279,7 +239,7 @@ public final class Record { } /** - * The compression type used with this record + * The compression codec used with this record */ public CompressionType compressionType() { return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK); diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java index 047ca98..153c5a6 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java @@ -28,30 +28,6 @@ import java.util.zip.Checksum; */ public class Crc32 implements Checksum { - /** - * Compute the CRC32 of the byte array - * - * @param bytes The array to compute the checksum for - * @return The CRC32 - */ - public static long crc32(byte[] bytes) { - return crc32(bytes, 0, bytes.length); - } - - /** - * Compute the CRC32 of the segment of the byte array given by the specified size and offset - * - * @param bytes The bytes to checksum - * @param offset the offset at which to begin checksumming - * @param size the number of bytes to checksum - * @return The CRC32 - */ - public static long crc32(byte[] bytes, int offset, int size) { - Crc32 crc = new Crc32(); - crc.update(bytes, offset, size); - return crc.getValue(); - } - /** the current CRC value, bit-flipped */ private int crc; @@ -121,18 +97,6 @@ public class Crc32 implements Checksum { crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)]; } - /** - * Update the CRC32 given an integer - */ - final public void updateInt(int input) { - update((byte) (input >> 24)); - update((byte) (input >> 16)); - update((byte) (input >> 8)); - update((byte) input /* >> 0 */); - } - - - /* * CRC-32 lookup tables generated by the polynomial 0xEDB88320. See also TestPureJavaCrc32.Table. */ diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 50af601..0c6b365 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -92,6 +92,30 @@ public class Utils { } /** + * Compute the CRC32 of the byte array + * + * @param bytes The array to compute the checksum for + * @return The CRC32 + */ + public static long crc32(byte[] bytes) { + return crc32(bytes, 0, bytes.length); + } + + /** + * Compute the CRC32 of the segment of the byte array given by the specificed size and offset + * + * @param bytes The bytes to checksum + * @param offset the offset at which to begin checksumming + * @param size the number of bytes to checksum + * @return The CRC32 + */ + public static long crc32(byte[] bytes, int offset, int size) { + Crc32 crc = new Crc32(); + crc.update(bytes, offset, size); + return crc.getValue(); + } + + /** * Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). */ diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 94a1112..b0745b5 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -22,35 +22,29 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; -import java.util.*; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import org.apache.kafka.common.record.LogEntry; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Record; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -@RunWith(value = Parameterized.class) public class MemoryRecordsTest { - private CompressionType compression; - - public MemoryRecordsTest(CompressionType compression) { - this.compression = compression; - } - @Test public void testIterator() { - MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression); - MemoryRecords recs2 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression); + MemoryRecords recs1 = new MemoryRecords(ByteBuffer.allocate(1024)); + MemoryRecords recs2 = new MemoryRecords(ByteBuffer.allocate(1024)); List list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()), new Record("b".getBytes(), "2".getBytes()), new Record("c".getBytes(), "3".getBytes())); for (int i = 0; i < list.size(); i++) { Record r = list.get(i); recs1.append(i, r); - recs2.append(i, toArray(r.key()), toArray(r.value())); + recs2.append(i, toArray(r.key()), toArray(r.value()), r.compressionType()); } - recs1.close(); - recs2.close(); for (int iteration = 0; iteration < 2; iteration++) { for (MemoryRecords recs : Arrays.asList(recs1, recs2)) { @@ -60,18 +54,10 @@ public class MemoryRecordsTest { LogEntry entry = iter.next(); assertEquals((long) i, entry.offset()); assertEquals(list.get(i), entry.record()); - entry.record().ensureValid(); } assertFalse(iter.hasNext()); } } } - @Parameterized.Parameters - public static Collection data() { - List values = new ArrayList(); - for (CompressionType type: CompressionType.values()) - values.add(new Object[] { type }); - return values; - } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java index 2765913..ae54d67 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java @@ -27,6 +27,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.InvalidRecordException; +import org.apache.kafka.common.record.Record; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -63,10 +66,6 @@ public class RecordTest { @Test public void testChecksum() { assertEquals(record.checksum(), record.computeChecksum()); - assertEquals(record.checksum(), record.computeChecksum( - this.key == null ? null : this.key.array(), - this.value == null ? null : this.value.array(), - this.compression, 0, -1)); assertTrue(record.isValid()); for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) { Record copy = copyOf(record); @@ -96,11 +95,9 @@ public class RecordTest { @Parameters public static Collection data() { - byte[] payload = new byte[1000]; - Arrays.fill(payload, (byte) 1); List values = new ArrayList(); - for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload)) - for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload)) + for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes())) + for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes())) for (CompressionType compression : CompressionType.values()) values.add(new Object[] { key, value, compression }); return values; diff --git a/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java deleted file mode 100644 index 6b32381..0000000 --- a/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.utils; - -import static org.junit.Assert.assertEquals; -import org.junit.Test; - -import java.nio.ByteBuffer; - -public class CrcTest { - - @Test - public void testUpdate() { - final byte bytes[] = "Any String you want".getBytes(); - final int len = bytes.length; - - Crc32 crc1 = new Crc32(); - Crc32 crc2 = new Crc32(); - Crc32 crc3 = new Crc32(); - - crc1.update(bytes, 0, len); - for(int i = 0; i < len; i++) - crc2.update(bytes[i]); - crc3.update(bytes, 0, len/2); - crc3.update(bytes, len/2, len-len/2); - - assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue()); - assertEquals("Crc values should be the same", crc1.getValue(), crc3.getValue()); - } - - @Test - public void testUpdateInt() { - final int value = 1000; - final ByteBuffer buffer = ByteBuffer.allocate(4); - buffer.putInt(value); - - Crc32 crc1 = new Crc32(); - Crc32 crc2 = new Crc32(); - - crc1.updateInt(value); - crc2.update(buffer.array(), buffer.arrayOffset(), 4); - - assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue()); - } -} diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 76a17e8..36cfc0f 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -88,7 +88,7 @@ public class TestUtils { /** * Generate an array of random bytes * - * @param size The size of the array + * @param numBytes The size of the array */ public static byte[] randomBytes(int size) { byte[] bytes = new byte[size]; diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 7dc2718..c8a56ee 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -37,9 +37,9 @@ import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} import java.util.concurrent.atomic.AtomicInteger import org.apache.log4j.Logger +import java.util.concurrent.locks.ReentrantLock import scala.Some import kafka.common.TopicAndPartition -import java.util.concurrent.locks.ReentrantLock class ControllerContext(val zkClient: ZkClient, val zkSessionTimeout: Int) { @@ -643,15 +643,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg def shutdown() = { inLock(controllerContext.controllerLock) { isRunning = false - partitionStateMachine.shutdown() - replicaStateMachine.shutdown() - if (config.autoLeaderRebalanceEnable) - autoRebalanceScheduler.shutdown() - if(controllerContext.controllerChannelManager != null) { - controllerContext.controllerChannelManager.shutdown() - controllerContext.controllerChannelManager = null - } - info("Controller shutdown complete") + onControllerResignation() } } diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 488dfd0..43313ff 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -22,6 +22,7 @@ import kafka.utils.Utils._ import collection.Set import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.api.{StopReplicaResponse, RequestOrResponse} +import java.util.concurrent.locks.ReentrantLock /** * This manages the state machine for topic deletion. @@ -71,9 +72,10 @@ class TopicDeletionManager(controller: KafkaController, val partitionStateMachine = controller.partitionStateMachine val replicaStateMachine = controller.replicaStateMachine var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted + val deleteLock = new ReentrantLock() var topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++ (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted) - val deleteTopicsCond = controllerContext.controllerLock.newCondition() + val deleteTopicsCond = deleteLock.newCondition() var deleteTopicStateChanged: Boolean = false var deleteTopicsThread: DeleteTopicsThread = null val isDeleteTopicEnabled = controller.config.deleteTopicEnable @@ -195,11 +197,13 @@ class TopicDeletionManager(controller: KafkaController, * controllerLock should be acquired before invoking this API */ private def awaitTopicDeletionNotification() { - while(!deleteTopicStateChanged) { - info("Waiting for signal to start or continue topic deletion") - deleteTopicsCond.await() + inLock(deleteLock) { + while(!deleteTopicStateChanged) { + info("Waiting for signal to start or continue topic deletion") + deleteTopicsCond.await() + } + deleteTopicStateChanged = false } - deleteTopicStateChanged = false } /** @@ -207,7 +211,9 @@ class TopicDeletionManager(controller: KafkaController, */ private def resumeTopicDeletionThread() { deleteTopicStateChanged = true - deleteTopicsCond.signal() + inLock(deleteLock) { + deleteTopicsCond.signal() + } } /** @@ -352,8 +358,9 @@ class TopicDeletionManager(controller: KafkaController, class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") { val zkClient = controllerContext.zkClient override def doWork() { + awaitTopicDeletionNotification() + inLock(controllerContext.controllerLock) { - awaitTopicDeletionNotification() val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted if(topicsQueuedForDeletion.size > 0) info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(",")) diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index 57386b1..dd39ff2 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -255,8 +255,8 @@ object ConsoleProducer { class NewShinyProducer(producerConfig: ProducerConfig) extends Producer { val props = new Properties() props.put("metadata.broker.list", producerConfig.brokerList) - val compression = if(producerConfig.compress) DefaultCompressionCodec.name else NoCompressionCodec.name - props.put("compression.type", compression) + val codec = if(producerConfig.compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec + props.put("compression.codec", codec.toString) props.put("send.buffer.bytes", producerConfig.socketBuffer.toString) props.put("metadata.fetch.backoff.ms", producerConfig.retryBackoffMs.toString) props.put("metadata.expiry.ms", producerConfig.metadataExpiryMs.toString) diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala deleted file mode 100644 index 1d73aca..0000000 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api.test - -import java.util.{Properties, Collection, ArrayList} - -import org.scalatest.junit.JUnit3Suite -import org.junit.runners.Parameterized -import org.junit.runner.RunWith -import org.junit.runners.Parameterized.Parameters -import org.junit.{After, Before, Test} -import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig} -import org.junit.Assert._ - -import kafka.api.FetchRequestBuilder -import kafka.server.{KafkaConfig, KafkaServer} -import kafka.consumer.SimpleConsumer -import kafka.message.Message -import kafka.zk.ZooKeeperTestHarness -import kafka.utils.{Utils, TestUtils} - -import scala.Array - - -@RunWith(value = classOf[Parameterized]) -class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooKeeperTestHarness { - private val brokerId = 0 - private val port = TestUtils.choosePort - private var server: KafkaServer = null - - private val props = TestUtils.createBrokerConfig(brokerId, port) - private val config = new KafkaConfig(props) - - private val topic = "topic" - private val numRecords = 100 - - @Before - override def setUp() { - super.setUp() - server = TestUtils.createServer(config) - } - - @After - override def tearDown() { - server.shutdown - Utils.rm(server.config.logDirs) - super.tearDown() - } - - /** - * testCompression - * - * Compressed messages should be able to sent and consumed correctly - */ - @Test - def testCompression() { - - val props = new Properties() - props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config))) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) - var producer = new KafkaProducer(props) - - try { - // create topic - TestUtils.createTopic(zkClient, topic, 1, 1, List(server)) - val partition = 0 - - // prepare the messages - val messages = for (i <-0 until numRecords) - yield ("value" + i).getBytes - - // make sure the returned messages are correct - val responses = for (message <- messages) - yield producer.send(new ProducerRecord(topic, null, null, message)) - val futures = responses.toList - for ((future, offset) <- futures zip (0 until numRecords)) { - assertEquals(offset.toLong, future.get.offset) - } - - // make sure the fetched message count match - val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "") - val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) - val messageSet = fetchResponse.messageSet(topic, partition).iterator.toBuffer - assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet.size) - - var index = 0 - for (message <- messages) { - assertEquals(new Message(bytes = message), messageSet(index).message) - assertEquals(index.toLong, messageSet(index).offset) - index += 1 - } - } finally { - if (producer != null) { - producer.close() - producer = null - } - } - } -} - -object ProducerCompressionTest { - - // NOTE: Must return collection of Array[AnyRef] (NOT Array[Any]). - @Parameters - def parameters: Collection[Array[String]] = { - val list = new ArrayList[Array[String]]() - list.add(Array("gzip")) - list.add(Array("snappy")) - list - } -} diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 525a060..c002f5e 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -319,6 +319,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.MAX_RETRIES_CONFIG, 10.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString) + val producer = new KafkaProducer(producerProps) override def doWork(): Unit = { @@ -334,10 +335,5 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness case e : Exception => failed = true } } - - override def shutdown(){ - super.shutdown() - producer.close - } } } \ No newline at end of file diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 3c37330..66ea76b 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -17,14 +17,6 @@ package kafka.api.test -import java.util.Properties -import java.lang.{Integer, IllegalArgumentException} - -import org.apache.kafka.clients.producer._ -import org.scalatest.junit.JUnit3Suite -import org.junit.Test -import org.junit.Assert._ - import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness @@ -32,6 +24,15 @@ import kafka.consumer.SimpleConsumer import kafka.api.FetchRequestBuilder import kafka.message.Message +import org.apache.kafka.clients.producer._ + +import org.scalatest.junit.JUnit3Suite +import org.junit.Test +import org.junit.Assert._ + +import java.util.Properties +import java.lang.{Integer, IllegalArgumentException} + class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { private val brokerId1 = 0 @@ -75,10 +76,15 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { super.tearDown() } - class CheckErrorCallback extends Callback { + class PrintOffsetCallback extends Callback { def onCompletion(metadata: RecordMetadata, exception: Exception) { if (exception != null) fail("Send callback returns the following exception", exception) + try { + System.out.println("The message we just sent is marked as [" + metadata.partition + "] : " + metadata.offset); + } catch { + case e: Throwable => fail("Should succeed sending the message", e) + } } } @@ -94,7 +100,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) var producer = new KafkaProducer(props) - val callback = new CheckErrorCallback + val callback = new PrintOffsetCallback try { // create topic diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 67497dd..6061ede 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -139,13 +139,27 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with @Test def testLog4jAppends() { - PropertyConfigurator.configure(getLog4jConfig) + val topic = TestUtils.tempTopic() + PropertyConfigurator.configure({ + var props = new Properties() + props.put("log4j.rootLogger", "INFO") + props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) + props.put("log4j.appender.KAFKA.Topic", topic) + props.put("log4j.logger.kafka.log4j", "INFO,KAFKA") + props.put("log4j.appender.KAFKA.requiredNumAcks", "1") + props + }) for(i <- 1 to 5) info("test") - val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build()) - val fetchMessage = response.messageSet("test-topic", 0) + Thread.sleep(100) // Allow messages to be ready for consuming + + val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0L, 1024*1024).build()) + val fetchMessage = response.messageSet(topic, 0) var count = 0 for(message <- fetchMessage) { @@ -155,19 +169,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with assertEquals(5, count) } - private def getLog4jConfig: Properties = { - var props = new Properties() - props.put("log4j.rootLogger", "INFO") - props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") - props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) - props.put("log4j.appender.KAFKA.Topic", "test-topic") - props.put("log4j.logger.kafka.log4j", "INFO,KAFKA") - props.put("log4j.appender.KAFKA.requiredNumAcks", "1") - props } -} class AppenderStringEncoder(encoding: String = "UTF-8") extends Encoder[LoggingEvent] { def toBytes(event: LoggingEvent): Array[Byte] = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index b5936d4..e69e0fe 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -31,16 +31,16 @@ class ReplicaManagerTest extends JUnit3Suite { @Test def testHighwaterMarkDirectoryMapping() { val props = TestUtils.createBrokerConfig(1) - val dir = "/tmp/kafka-logs/" - new File(dir).mkdir() - props.setProperty("log.dirs", dir) + val dir = TestUtils.tempDir() + props.setProperty("log.dirs", dir.getAbsolutePath + File.separator) val config = new KafkaConfig(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) val mockLogMgr = EasyMock.createMock(classOf[LogManager]) val time: MockTime = new MockTime() + val topic = TestUtils.tempTopic() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) - val partition = rm.getOrCreatePartition("test-topic", 1, 1) - partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new File("/tmp/kafka-logs/test-topic-1"), new LogConfig(), 0L, null)))) + val partition = rm.getOrCreatePartition(topic, 1, 1) + partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new File(dir, topic), new LogConfig(), 0L, null)))) rm.checkpointHighWatermarks() } } diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 20fe93e..c7e058f 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -96,5 +96,25 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() server.shutdown() Utils.rm(server.config.logDirs) + verifyNonDaemonThreadsStatus + } + + @Test + def testCleanShutdownWithDeleteTopicEnabled() { + val newProps = TestUtils.createBrokerConfig(0, port) + newProps.setProperty("delete.topic.enable", "true") + val newConfig = new KafkaConfig(newProps) + var server = new KafkaServer(newConfig) + server.startup() + server.shutdown() + server.awaitShutdown() + Utils.rm(server.config.logDirs) + verifyNonDaemonThreadsStatus + } + + def verifyNonDaemonThreadsStatus() { + assertEquals(0, Thread.getAllStackTraces.keySet().toArray + .map(_.asInstanceOf[Thread]) + .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka"))) } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 2054c25..799668d 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -88,6 +88,8 @@ object TestUtils extends Logging { f } + def tempTopic(): String = "test-topic-" + random.nextInt(1000000) + /** * Create a temporary file */ diff --git a/gradle.properties b/gradle.properties index 4827769..236e243 100644 --- a/gradle.properties +++ b/gradle.properties @@ -15,7 +15,7 @@ group=org.apache.kafka version=0.8.1 -scalaVersion=2.8.0 +scalaVersion=2.9.2 task=build mavenUrl= diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index 9e4ebaf..3df0d13 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -212,7 +212,6 @@ object ProducerPerformance extends Logging { props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) props.put("request.retries", config.producerNumRetries.toString) props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) - props.put("compression.type", config.compressionCodec.name) val producer = new KafkaProducer(props) def send(topic: String, partition: Long, bytes: Array[Byte]) {