diff --git a/build.gradle b/build.gradle index d6fd287..5432c0c 100644 --- a/build.gradle +++ b/build.gradle @@ -151,7 +151,6 @@ project(':core') { compile 'com.101tec:zkclient:0.3' compile 'com.yammer.metrics:metrics-core:2.2.0' compile 'net.sf.jopt-simple:jopt-simple:3.2' - compile 'org.xerial.snappy:snappy-java:1.0.5' testCompile 'junit:junit:4.1' testCompile 'org.easymock:easymock:3.0' @@ -317,6 +316,8 @@ project(':clients') { dependencies { compile "org.slf4j:slf4j-api:1.7.6" + compile 'org.xerial.snappy:snappy-java:1.0.5' + testCompile 'com.novocode:junit-interface:0.9' testRuntime "$slf4jlog4j" } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 1ac6943..1ff9174 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -69,6 +69,7 @@ public class KafkaProducer implements Producer { private final Sender sender; private final Metrics metrics; private final Thread ioThread; + private final CompressionType compressionType; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -99,6 +100,7 @@ public class KafkaProducer implements Producer { config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG); + this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG), this.totalMemorySize, config.getLong(ProducerConfig.LINGER_MS_CONFIG), @@ -224,7 +226,7 @@ public class KafkaProducer implements Producer { ensureValidSize(record.key(), record.value()); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback); + FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), compressionType, callback); this.sender.wakeup(); return future; // For API exceptions return them in the future; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 32e12ad..48706ba 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -136,6 +136,11 @@ public class ProducerConfig extends AbstractConfig { public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; /** + * The compression type for all data generated. The default is none (i.e. no compression) + */ + public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; + + /** * Should we register the Kafka metrics as JMX mbeans? */ public static final String ENABLE_JMX_CONFIG = "enable.jmx"; @@ -158,9 +163,10 @@ public class ProducerConfig extends AbstractConfig { .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah") .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah") .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah") - .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "") .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "") - .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah"); + .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah") + .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", "blah blah") + .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, ""); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index b69866a..d1d6c4b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -71,7 +71,7 @@ public final class BufferPool { * @param size The buffer size to allocate in bytes * @return The buffer * @throws InterruptedException If the thread is interrupted while blocked - * @throws IllegalArgument if size is larger than the total memory controlled by the pool (and hence we would block + * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block * forever) * @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool */ @@ -167,28 +167,31 @@ public final class BufferPool { * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the * memory as free. * - * @param buffers The buffers to return + * @param buffer The buffer to return + * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity + * since the buffer may re-allocate itself during in-place compression */ - public void deallocate(ByteBuffer... buffers) { + public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { - for (int i = 0; i < buffers.length; i++) { - int size = buffers[i].capacity(); - if (size == this.poolableSize) { - buffers[i].clear(); - this.free.add(buffers[i]); - } else { - this.availableMemory += size; - } - Condition moreMem = this.waiters.peekFirst(); - if (moreMem != null) - moreMem.signal(); + if (size == this.poolableSize && size == buffer.capacity()) { + buffer.clear(); + this.free.add(buffer); + } else { + this.availableMemory += size; } + Condition moreMem = this.waiters.peekFirst(); + if (moreMem != null) + moreMem.signal(); } finally { lock.unlock(); } } + public void deallocate(ByteBuffer buffer) { + deallocate(buffer, buffer.capacity()); + } + /** * the total free memory both unallocated and in the free list */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 673b296..50bf95f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -28,8 +28,8 @@ import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.Record; import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -91,26 +91,26 @@ public final class RecordAccumulator { private void registerMetrics(Metrics metrics) { metrics.addMetric("blocked_threads", - "The number of user threads blocked waiting for buffer memory to enqueue their records", - new Measurable() { + "The number of user threads blocked waiting for buffer memory to enqueue their records", + new Measurable() { public double measure(MetricConfig config, long now) { return free.queued(); } }); metrics.addMetric("buffer_total_bytes", - "The total amount of buffer memory that is available (not currently used for buffering records).", - new Measurable() { + "The total amount of buffer memory that is available (not currently used for buffering records).", + new Measurable() { public double measure(MetricConfig config, long now) { return free.totalMemory(); } }); metrics.addMetric("buffer_available_bytes", - "The total amount of buffer memory that is available (not currently used for buffering records).", - new Measurable() { - public double measure(MetricConfig config, long now) { - return free.availableMemory(); - } - }); + "The total amount of buffer memory that is available (not currently used for buffering records).", + new Measurable() { + public double measure(MetricConfig config, long now) { + return free.availableMemory(); + } + }); } /** @@ -132,7 +132,7 @@ public final class RecordAccumulator { synchronized (dq) { RecordBatch batch = dq.peekLast(); if (batch != null) { - FutureRecordMetadata future = batch.tryAppend(key, value, compression, callback); + FutureRecordMetadata future = batch.tryAppend(key, value, callback); if (future != null) return future; } @@ -145,7 +145,7 @@ public final class RecordAccumulator { synchronized (dq) { RecordBatch last = dq.peekLast(); if (last != null) { - FutureRecordMetadata future = last.tryAppend(key, value, compression, callback); + FutureRecordMetadata future = last.tryAppend(key, value, callback); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen // often... @@ -153,8 +153,10 @@ public final class RecordAccumulator { return future; } } - RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds()); - FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, compression, callback)); + MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression); + RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); + FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback)); + dq.addLast(batch); return future; } @@ -193,7 +195,7 @@ public final class RecordAccumulator { RecordBatch batch = deque.peekFirst(); if (batch != null) { boolean backingOff = batch.attempts > 0 && batch.lastAttempt + retryBackoffMs > now; - boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining(); + boolean full = deque.size() > 1 || batch.records.isFull(); boolean expired = now - batch.created >= lingerMs; boolean sendable = full || expired || exhausted || closed; if (sendable && !backingOff) @@ -239,10 +241,15 @@ public final class RecordAccumulator { Deque deque = dequeFor(tp); if (deque != null) { synchronized (deque) { - if (size + deque.peekFirst().records.sizeInBytes() > maxSize) { + RecordBatch first = deque.peekFirst(); + if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { + // there is a rare case that a single batch size is larger than the request size due + // to compression; in this case we will still eventually send this batch in a single + // request return ready; } else { RecordBatch batch = deque.pollFirst(); + batch.records.close(); size += batch.records.sizeInBytes(); ready.add(batch); } @@ -269,7 +276,7 @@ public final class RecordAccumulator { * Deallocate the record batch */ public void deallocate(RecordBatch batch) { - free.deallocate(batch.records.buffer()); + free.deallocate(batch.records.buffer(), batch.records.capacity()); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 038a05a..35f1d7a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -17,7 +17,6 @@ import java.util.List; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,11 +53,11 @@ public final class RecordBatch { * * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ - public FutureRecordMetadata tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) { + public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) { if (!this.records.hasRoomFor(key, value)) { return null; } else { - this.records.append(0L, key, value, compression); + this.records.append(0L, key, value); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) thunks.add(new Thunk(callback, future)); @@ -71,7 +70,7 @@ public final class RecordBatch { * Complete the request * * @param baseOffset The base offset of the messages assigned by the server - * @param errorCode The error code or 0 if no error + * @param exception The exception returned or null if no exception */ public void done(long baseOffset, RuntimeException exception) { this.produceFuture.done(topicPartition, baseOffset, exception); diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index 3ebbb80..05085e0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -29,8 +29,8 @@ import org.apache.kafka.common.record.Records; public class ProducerPerformance { public static void main(String[] args) throws Exception { - if (args.length != 5) { - System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks"); + if (args.length < 5) { + System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks [compression_type]"); System.exit(1); } String url = args[0]; @@ -45,6 +45,8 @@ public class ProducerPerformance { props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE)); props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(256 * 1024 * 1024)); props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(256 * 1024)); + if (args.length == 6) + props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]); KafkaProducer producer = new KafkaProducer(props); Callback callback = new Callback() { diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java new file mode 100644 index 0000000..12651d4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * A byte buffer backed input outputStream + */ +public class ByteBufferInputStream extends InputStream { + + private ByteBuffer buffer; + + public ByteBufferInputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + public int read() { + if (!buffer.hasRemaining()) { + return -1; + } + return buffer.get() & 0xFF; + } + + public int read(byte[] bytes, int off, int len) { + if (!buffer.hasRemaining()) { + return -1; + } + + len = Math.min(len, buffer.remaining()); + buffer.get(bytes, off, len); + return len; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java new file mode 100644 index 0000000..c7bd2f8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * A byte buffer backed output outputStream + */ +public class ByteBufferOutputStream extends OutputStream { + + private static float REALLOCATION_FACTOR = 1.1f; + + private ByteBuffer buffer; + + public ByteBufferOutputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + public void write(int b) { + if (buffer.remaining() < 1) + expandBuffer(buffer.capacity() + 1); + buffer.put((byte) b); + } + + public void write(byte[] bytes, int off, int len) { + if (buffer.remaining() < len) + expandBuffer(buffer.capacity() + len); + buffer.put(bytes, off, len); + } + + public ByteBuffer buffer() { + return buffer; + } + + private void expandBuffer(int size) { + int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size); + ByteBuffer temp = ByteBuffer.allocate(expandSize); + temp.put(buffer.array(), buffer.arrayOffset(), buffer.position()); + buffer = temp; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 906da02..c557e44 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -20,14 +20,16 @@ package org.apache.kafka.common.record; * The compression type to use */ public enum CompressionType { - NONE(0, "none"), GZIP(1, "gzip"), SNAPPY(2, "snappy"); + NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f); public final int id; public final String name; + public final float rate; - private CompressionType(int id, String name) { + private CompressionType(int id, String name, float rate) { this.id = id; this.name = name; + this.rate = rate; } public static CompressionType forId(int id) { @@ -53,4 +55,5 @@ public enum CompressionType { else throw new IllegalArgumentException("Unknown compression name: " + name); } + } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java new file mode 100644 index 0000000..6ae3d06 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.Utils; + +import java.io.InputStream; +import java.io.OutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class Compressor { + + static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f; + static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f; + static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024; + + private static float[] typeToRate; + private static int MAX_TYPE_ID = -1; + + static { + for (CompressionType type : CompressionType.values()) { + MAX_TYPE_ID = Math.max(MAX_TYPE_ID, type.id); + } + typeToRate = new float[MAX_TYPE_ID+1]; + for (CompressionType type : CompressionType.values()) { + typeToRate[type.id] = type.rate; + } + } + + private final CompressionType type; + private final DataOutputStream appendStream; + private final ByteBufferOutputStream bufferStream; + private final int initPos; + + public long writtenUncompressed; + public long numRecords; + + public Compressor(ByteBuffer buffer, CompressionType type, int blockSize) { + this.type = type; + this.initPos = buffer.position(); + + this.numRecords = 0; + this.writtenUncompressed = 0; + + if (type != CompressionType.NONE) { + // for compressed records, leave space for the header and the shallow message metadata + // and move the starting position to the value payload offset + buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD); + } + + // create the stream + bufferStream = new ByteBufferOutputStream(buffer); + appendStream = wrapForOutput(bufferStream, type, blockSize); + } + + public Compressor(ByteBuffer buffer, CompressionType type) { + this(buffer, type, COMPRESSION_DEFAULT_BUFFER_SIZE); + } + + public ByteBuffer buffer() { + return bufferStream.buffer(); + } + + public void close() { + try { + appendStream.close(); + } catch (IOException e) { + throw new KafkaException(e); + } + + if (type != CompressionType.NONE) { + ByteBuffer buffer = bufferStream.buffer(); + int pos = buffer.position(); + // write the header, for the end offset write as number of records - 1 + buffer.position(initPos); + buffer.putLong(numRecords - 1); + buffer.putInt(pos - initPos - Records.LOG_OVERHEAD); + // write the shallow message (the crc and value size are not correct yet) + Record.write(buffer, null, null, type, 0, -1); + // compute the fill the value size + int valueSize = pos - initPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD; + buffer.putInt(initPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET, valueSize); + // compute and fill the crc at the beginning of the message + long crc = Record.computeChecksum(buffer, + initPos + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET, + pos - initPos - Records.LOG_OVERHEAD - Record.MAGIC_OFFSET); + Utils.writeUnsignedInt(buffer, initPos + Records.LOG_OVERHEAD + Record.CRC_OFFSET, crc); + // reset the position + buffer.position(pos); + + // update the compression ratio + float compressionRate = (float) buffer.position() / this.writtenUncompressed; + typeToRate[type.id] = typeToRate[type.id] * COMPRESSION_RATE_DAMPING_FACTOR + + compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR); + } + } + + // Note that for all the write operations below, IO exceptions should + // never be thrown since the underlying ByteBufferOutputStream does not throw IOException; + // therefore upon encountering this issue we just close the append stream. + + public void putLong(final long value) { + try { + appendStream.writeLong(value); + } catch (IOException e) { + throw new KafkaException("I/O exception when writing to the append stream, closing", e); + } + } + + public void putInt(final int value) { + try { + appendStream.writeInt(value); + } catch (IOException e) { + throw new KafkaException("I/O exception when writing to the append stream, closing", e); + } + } + + public void put(final ByteBuffer buffer) { + try { + appendStream.write(buffer.array(), buffer.arrayOffset(), buffer.limit()); + } catch (IOException e) { + throw new KafkaException("I/O exception when writing to the append stream, closing", e); + } + } + + public void putByte(final byte value) { + try { + appendStream.write(value); + } catch (IOException e) { + throw new KafkaException("I/O exception when writing to the append stream, closing", e); + } + } + + public void put(final byte[] bytes, final int offset, final int len) { + try { + appendStream.write(bytes, offset, len); + } catch (IOException e) { + throw new KafkaException("I/O exception when writing to the append stream, closing", e); + } + } + + public void putRecord(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { + // put a record as un-compressed into the underlying stream + long crc = Record.computeChecksum(key, value, type, valueOffset, valueSize); + byte attributes = Record.computeAttributes(type); + putRecord(crc, attributes, key, value, valueOffset, valueSize); + } + + public void putRecord(byte[] key, byte[] value) { + putRecord(key, value, CompressionType.NONE, 0, -1); + } + + private void putRecord(final long crc, final byte attributes, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) { + Record.write(this, crc, attributes, key, value, valueOffset, valueSize); + } + + public void recordWritten(int size) { + numRecords += 1; + writtenUncompressed += size; + } + + public long estimatedBytesWritten() { + if (type == CompressionType.NONE) { + return bufferStream.buffer().position(); + } else { + // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes + return (long) (writtenUncompressed * typeToRate[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR); + } + } + + // the following two functions also need to be public since they are used in MemoryRecords.iteration + + static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) { + try { + switch (type) { + case NONE: + return new DataOutputStream(buffer); + case GZIP: + return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); + case SNAPPY: + // dynamically load the snappy class to avoid runtime dependency + // on snappy if we are not using it + try { + Class SnappyOutputStream = Class.forName("org.xerial.snappy.SnappyOutputStream"); + OutputStream stream = (OutputStream) SnappyOutputStream.getConstructor(OutputStream.class, Integer.TYPE) + .newInstance(buffer, bufferSize); + return new DataOutputStream(stream); + } catch (Exception e) { + throw new KafkaException(e); + } + default: + throw new IllegalArgumentException("Unknown compression type: " + type); + } + } catch (IOException e) { + throw new KafkaException(e); + } + } + + static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type) { + try { + switch (type) { + case NONE: + return new DataInputStream(buffer); + case GZIP: + return new DataInputStream(new GZIPInputStream(buffer)); + case SNAPPY: + // dynamically load the snappy class to avoid runtime dependency + // on snappy if we are not using it + try { + Class SnappyInputStream = Class.forName("org.xerial.snappy.SnappyInputStream"); + InputStream stream = (InputStream) SnappyInputStream.getConstructor(InputStream.class) + .newInstance(buffer); + return new DataInputStream(stream); + } catch (Exception e) { + throw new KafkaException(e); + } + default: + throw new IllegalArgumentException("Unknown compression type: " + type); + } + } catch (IOException e) { + throw new KafkaException(e); + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 9d8935f..428968c 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -16,53 +16,99 @@ */ package org.apache.kafka.common.record; +import java.io.DataInputStream; +import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; import java.util.Iterator; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.AbstractIterator; - /** * A {@link Records} implementation backed by a ByteBuffer. */ public class MemoryRecords implements Records { - private final ByteBuffer buffer; + private final Compressor compressor; + private final int capacity; + private ByteBuffer buffer; + private boolean writable; + + // Construct a writable memory records + private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable) { + this.writable = writable; + this.capacity = buffer.capacity(); + if (this.writable) { + this.buffer = null; + this.compressor = new Compressor(buffer, type); + } else { + this.buffer = buffer; + this.compressor = null; + } + } - public MemoryRecords(int size) { - this(ByteBuffer.allocate(size)); + public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) { + return new MemoryRecords(buffer, type, true); } - public MemoryRecords(ByteBuffer buffer) { - this.buffer = buffer; + public static MemoryRecords iterableRecords(ByteBuffer buffer) { + return new MemoryRecords(buffer, CompressionType.NONE, false); } /** * Append the given record and offset to the buffer */ public void append(long offset, Record record) { - buffer.putLong(offset); - buffer.putInt(record.size()); - buffer.put(record.buffer()); + if (!writable) + throw new IllegalStateException("Memory records is not writable"); + + int size = record.size(); + compressor.putLong(offset); + compressor.putInt(size); + compressor.put(record.buffer()); + compressor.recordWritten(size + Records.LOG_OVERHEAD); record.buffer().rewind(); } /** * Append a new record and offset to the buffer */ - public void append(long offset, byte[] key, byte[] value, CompressionType type) { - buffer.putLong(offset); - buffer.putInt(Record.recordSize(key, value)); - Record.write(this.buffer, key, value, type); + public void append(long offset, byte[] key, byte[] value) { + if (!writable) + throw new IllegalStateException("Memory records is not writable"); + + int size = Record.recordSize(key, value); + compressor.putLong(offset); + compressor.putInt(size); + compressor.putRecord(key, value); + compressor.recordWritten(size + Records.LOG_OVERHEAD); } /** * Check if we have room for a new record containing the given key/value pair + * + * Note that the return value is based on the estimate of the bytes written to the compressor, + * which may not be accurate if compression is really used. When this happens, the following + * append may cause dynamic buffer re-allocation in the underlying byte buffer stream. */ public boolean hasRoomFor(byte[] key, byte[] value) { - return this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value); + return this.writable && + this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value); + } + + public boolean isFull() { + return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten(); + } + + /** + * Close this batch for no more appends + */ + public void close() { + compressor.close(); + writable = false; + buffer = compressor.buffer(); } /** Write the records in this set to the given channel */ @@ -74,7 +120,14 @@ public class MemoryRecords implements Records { * The size of this record set */ public int sizeInBytes() { - return this.buffer.position(); + return compressor.buffer().position(); + } + + /** + * Return the capacity of the buffer + */ + public int capacity() { + return this.capacity; } /** @@ -86,34 +139,79 @@ public class MemoryRecords implements Records { @Override public Iterator iterator() { - return new RecordsIterator(this.buffer); + ByteBuffer copy = (ByteBuffer) this.buffer.duplicate().flip(); + return new RecordsIterator(copy, CompressionType.NONE, false); } - /* TODO: allow reuse of the buffer used for iteration */ public static class RecordsIterator extends AbstractIterator { private final ByteBuffer buffer; - - public RecordsIterator(ByteBuffer buffer) { - ByteBuffer copy = buffer.duplicate(); - copy.flip(); - this.buffer = copy; + private final DataInputStream stream; + private final CompressionType type; + private final boolean shallow; + private RecordsIterator innerIter; + + public RecordsIterator(ByteBuffer buffer, CompressionType type, boolean shallow) { + this.type = type; + this.buffer = buffer; + this.shallow = shallow; + stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type); } + /* + * Read the next record from the buffer. + * + * Note that in the compressed message set, each message value size is set as the size + * of the un-compressed version of the message value, so when we do de-compression + * allocating an array of the specified size for reading compressed value data is sufficient. + */ @Override protected LogEntry makeNext() { - if (buffer.remaining() < Records.LOG_OVERHEAD) - return allDone(); - long offset = buffer.getLong(); - int size = buffer.getInt(); - if (size < 0) - throw new IllegalStateException("Record with size " + size); - if (buffer.remaining() < size) - return allDone(); - ByteBuffer rec = buffer.slice(); - rec.limit(size); - this.buffer.position(this.buffer.position() + size); - return new LogEntry(offset, new Record(rec)); + if (innerDone()) { + try { + // read the offset + long offset = stream.readLong(); + // read record size + int size = stream.readInt(); + if (size < 0) + throw new IllegalStateException("Record with size " + size); + // read the record, if compression is used we cannot depend on size + // and hence has to do extra copy + ByteBuffer rec; + if (type == CompressionType.NONE) { + rec = buffer.slice(); + buffer.position(buffer.position() + size); + rec.limit(size); + } else { + byte[] recordBuffer = new byte[size]; + stream.read(recordBuffer, 0, size); + rec = ByteBuffer.wrap(recordBuffer); + } + LogEntry entry = new LogEntry(offset, new Record(rec)); + entry.record().ensureValid(); + + // decide whether to go shallow or deep iteration if it is compressed + CompressionType compression = entry.record().compressionType(); + if (compression == CompressionType.NONE || shallow) { + return entry; + } else { + // init the inner iterator with the value payload of the message, + // which will de-compress the payload to a set of messages + ByteBuffer value = entry.record().value(); + innerIter = new RecordsIterator(value, compression, true); + return innerIter.next(); + } + } catch (EOFException e) { + return allDone(); + } catch (IOException e) { + throw new KafkaException(e); + } + } else { + return innerIter.next(); + } } - } + private boolean innerDone() { + return (innerIter == null || !innerIter.hasNext()); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index f1dc977..ce1177e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.record; import java.nio.ByteBuffer; +import org.apache.kafka.common.utils.Crc32; import org.apache.kafka.common.utils.Utils; @@ -40,13 +41,15 @@ public final class Record { public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH; public static final int VALUE_SIZE_LENGTH = 4; - /** The amount of overhead bytes in a record */ - public static final int RECORD_OVERHEAD = KEY_OFFSET + VALUE_SIZE_LENGTH; + /** + * The size for the record header + */ + public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH; /** - * The minimum valid size for the record header + * The amount of overhead bytes in a record */ - public static final int MIN_HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; + public static final int RECORD_OVERHEAD = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; /** * The current "magic" value @@ -71,27 +74,29 @@ public final class Record { } /** - * A constructor to create a LogRecord + * A constructor to create a LogRecord. If the record's compression type is not none, then + * its value payload should be already compressed with the specified type; the constructor + * would always write the value payload as is and will not do the compression itself. * * @param key The key of the record (null, if none) * @param value The record value - * @param codec The compression codec used on the contents of the record (if any) + * @param type The compression type used on the contents of the record (if any) * @param valueOffset The offset into the payload array used to extract payload * @param valueSize The size of the payload to use */ - public Record(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { - this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, value == null ? 0 : valueSize >= 0 ? valueSize - : value.length - valueOffset))); - write(this.buffer, key, value, codec, valueOffset, valueSize); + public Record(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { + this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, + value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset))); + write(this.buffer, key, value, type, valueOffset, valueSize); this.buffer.rewind(); } - public Record(byte[] key, byte[] value, CompressionType codec) { - this(key, value, codec, 0, -1); + public Record(byte[] key, byte[] value, CompressionType type) { + this(key, value, type, 0, -1); } - public Record(byte[] value, CompressionType codec) { - this(null, value, codec); + public Record(byte[] value, CompressionType type) { + this(null, value, type); } public Record(byte[] key, byte[] value) { @@ -102,40 +107,37 @@ public final class Record { this(null, value, CompressionType.NONE); } - public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { - // skip crc, we will fill that in at the end - int pos = buffer.position(); - buffer.position(pos + MAGIC_OFFSET); - buffer.put(CURRENT_MAGIC_VALUE); - byte attributes = 0; - if (codec.id > 0) - attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id)); - buffer.put(attributes); + // Write a record to the buffer, if the record's compression type is none, then + // its value payload should be already compressed with the specified type + public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { + // construct the compressor with compression type none since this function will not do any + //compression according to the input type, it will just write the record's payload as is + Compressor compressor = new Compressor(buffer, CompressionType.NONE, buffer.capacity()); + compressor.putRecord(key, value, type, valueOffset, valueSize); + } + + public static void write(Compressor compressor, long crc, byte attributes, byte[] key, byte[] value, int valueOffset, int valueSize) { + // write crc + compressor.putInt((int) (crc & 0xffffffffL)); + // write magic value + compressor.putByte(CURRENT_MAGIC_VALUE); + // write attributes + compressor.putByte(attributes); // write the key if (key == null) { - buffer.putInt(-1); + compressor.putInt(-1); } else { - buffer.putInt(key.length); - buffer.put(key, 0, key.length); + compressor.putInt(key.length); + compressor.put(key, 0, key.length); } // write the value if (value == null) { - buffer.putInt(-1); + compressor.putInt(-1); } else { int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); - buffer.putInt(size); - buffer.put(value, valueOffset, size); + compressor.putInt(size); + compressor.put(value, valueOffset, size); } - - // now compute the checksum and fill it in - long crc = computeChecksum(buffer, - buffer.arrayOffset() + pos + MAGIC_OFFSET, - buffer.position() - pos - MAGIC_OFFSET - buffer.arrayOffset()); - Utils.writeUnsignedInt(buffer, pos + CRC_OFFSET, crc); - } - - public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec) { - write(buffer, key, value, codec, 0, -1); } public static int recordSize(byte[] key, byte[] value) { @@ -150,13 +152,51 @@ public final class Record { return this.buffer; } + public static byte computeAttributes(CompressionType type) { + byte attributes = 0; + if (type.id > 0) + attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id)); + return attributes; + } + /** * Compute the checksum of the record from the record contents */ public static long computeChecksum(ByteBuffer buffer, int position, int size) { - return Utils.crc32(buffer.array(), buffer.arrayOffset() + position, size - buffer.arrayOffset()); + Crc32 crc = new Crc32(); + crc.update(buffer.array(), buffer.arrayOffset() + position, size); + return crc.getValue(); + } + + /** + * Compute the checksum of the record from the attributes, key and value payloads + */ + public static long computeChecksum(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { + Crc32 crc = new Crc32(); + crc.update(CURRENT_MAGIC_VALUE); + byte attributes = 0; + if (type.id > 0) + attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id)); + crc.update(attributes); + // update for the key + if (key == null) { + crc.updateInt(-1); + } else { + crc.updateInt(key.length); + crc.update(key, 0, key.length); + } + // update for the value + if (value == null) { + crc.updateInt(-1); + } else { + int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); + crc.updateInt(size); + crc.update(value, valueOffset, size); + } + return crc.getValue(); } + /** * Compute the checksum of the record from the record contents */ @@ -239,7 +279,7 @@ public final class Record { } /** - * The compression codec used with this record + * The compression type used with this record */ public CompressionType compressionType() { return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK); diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java index 153c5a6..047ca98 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java @@ -28,6 +28,30 @@ import java.util.zip.Checksum; */ public class Crc32 implements Checksum { + /** + * Compute the CRC32 of the byte array + * + * @param bytes The array to compute the checksum for + * @return The CRC32 + */ + public static long crc32(byte[] bytes) { + return crc32(bytes, 0, bytes.length); + } + + /** + * Compute the CRC32 of the segment of the byte array given by the specified size and offset + * + * @param bytes The bytes to checksum + * @param offset the offset at which to begin checksumming + * @param size the number of bytes to checksum + * @return The CRC32 + */ + public static long crc32(byte[] bytes, int offset, int size) { + Crc32 crc = new Crc32(); + crc.update(bytes, offset, size); + return crc.getValue(); + } + /** the current CRC value, bit-flipped */ private int crc; @@ -97,6 +121,18 @@ public class Crc32 implements Checksum { crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)]; } + /** + * Update the CRC32 given an integer + */ + final public void updateInt(int input) { + update((byte) (input >> 24)); + update((byte) (input >> 16)); + update((byte) (input >> 8)); + update((byte) input /* >> 0 */); + } + + + /* * CRC-32 lookup tables generated by the polynomial 0xEDB88320. See also TestPureJavaCrc32.Table. */ diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 0c6b365..50af601 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -92,30 +92,6 @@ public class Utils { } /** - * Compute the CRC32 of the byte array - * - * @param bytes The array to compute the checksum for - * @return The CRC32 - */ - public static long crc32(byte[] bytes) { - return crc32(bytes, 0, bytes.length); - } - - /** - * Compute the CRC32 of the segment of the byte array given by the specificed size and offset - * - * @param bytes The bytes to checksum - * @param offset the offset at which to begin checksumming - * @param size the number of bytes to checksum - * @return The CRC32 - */ - public static long crc32(byte[] bytes, int offset, int size) { - Crc32 crc = new Crc32(); - crc.update(bytes, offset, size); - return crc.getValue(); - } - - /** * Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). */ diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index b0745b5..94a1112 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -22,29 +22,35 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; +import java.util.*; -import org.apache.kafka.common.record.LogEntry; -import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.Record; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(value = Parameterized.class) public class MemoryRecordsTest { + private CompressionType compression; + + public MemoryRecordsTest(CompressionType compression) { + this.compression = compression; + } + @Test public void testIterator() { - MemoryRecords recs1 = new MemoryRecords(ByteBuffer.allocate(1024)); - MemoryRecords recs2 = new MemoryRecords(ByteBuffer.allocate(1024)); + MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression); + MemoryRecords recs2 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression); List list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()), new Record("b".getBytes(), "2".getBytes()), new Record("c".getBytes(), "3".getBytes())); for (int i = 0; i < list.size(); i++) { Record r = list.get(i); recs1.append(i, r); - recs2.append(i, toArray(r.key()), toArray(r.value()), r.compressionType()); + recs2.append(i, toArray(r.key()), toArray(r.value())); } + recs1.close(); + recs2.close(); for (int iteration = 0; iteration < 2; iteration++) { for (MemoryRecords recs : Arrays.asList(recs1, recs2)) { @@ -54,10 +60,18 @@ public class MemoryRecordsTest { LogEntry entry = iter.next(); assertEquals((long) i, entry.offset()); assertEquals(list.get(i), entry.record()); + entry.record().ensureValid(); } assertFalse(iter.hasNext()); } } } + @Parameterized.Parameters + public static Collection data() { + List values = new ArrayList(); + for (CompressionType type: CompressionType.values()) + values.add(new Object[] { type }); + return values; + } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java index ae54d67..2765913 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java @@ -27,9 +27,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.InvalidRecordException; -import org.apache.kafka.common.record.Record; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -66,6 +63,10 @@ public class RecordTest { @Test public void testChecksum() { assertEquals(record.checksum(), record.computeChecksum()); + assertEquals(record.checksum(), record.computeChecksum( + this.key == null ? null : this.key.array(), + this.value == null ? null : this.value.array(), + this.compression, 0, -1)); assertTrue(record.isValid()); for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) { Record copy = copyOf(record); @@ -95,9 +96,11 @@ public class RecordTest { @Parameters public static Collection data() { + byte[] payload = new byte[1000]; + Arrays.fill(payload, (byte) 1); List values = new ArrayList(); - for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes())) - for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes())) + for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload)) + for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload)) for (CompressionType compression : CompressionType.values()) values.add(new Object[] { key, value, compression }); return values; diff --git a/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java new file mode 100644 index 0000000..6b32381 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +import java.nio.ByteBuffer; + +public class CrcTest { + + @Test + public void testUpdate() { + final byte bytes[] = "Any String you want".getBytes(); + final int len = bytes.length; + + Crc32 crc1 = new Crc32(); + Crc32 crc2 = new Crc32(); + Crc32 crc3 = new Crc32(); + + crc1.update(bytes, 0, len); + for(int i = 0; i < len; i++) + crc2.update(bytes[i]); + crc3.update(bytes, 0, len/2); + crc3.update(bytes, len/2, len-len/2); + + assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue()); + assertEquals("Crc values should be the same", crc1.getValue(), crc3.getValue()); + } + + @Test + public void testUpdateInt() { + final int value = 1000; + final ByteBuffer buffer = ByteBuffer.allocate(4); + buffer.putInt(value); + + Crc32 crc1 = new Crc32(); + Crc32 crc2 = new Crc32(); + + crc1.updateInt(value); + crc2.update(buffer.array(), buffer.arrayOffset(), 4); + + assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue()); + } +} diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 36cfc0f..76a17e8 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -88,7 +88,7 @@ public class TestUtils { /** * Generate an array of random bytes * - * @param numBytes The size of the array + * @param size The size of the array */ public static byte[] randomBytes(int size) { byte[] bytes = new byte[size]; diff --git a/config/log4j.properties b/config/log4j.properties index baa698b..05bfd23 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -54,8 +54,8 @@ log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n # Turn on all our debugging info #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender -#log4j.logger.kafka.perf=DEBUG, kafkaAppender -#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender +log4j.logger.kafka.perf=DEBUG, kafkaAppender +log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG log4j.logger.kafka=INFO, kafkaAppender diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index dd39ff2..57386b1 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -255,8 +255,8 @@ object ConsoleProducer { class NewShinyProducer(producerConfig: ProducerConfig) extends Producer { val props = new Properties() props.put("metadata.broker.list", producerConfig.brokerList) - val codec = if(producerConfig.compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec - props.put("compression.codec", codec.toString) + val compression = if(producerConfig.compress) DefaultCompressionCodec.name else NoCompressionCodec.name + props.put("compression.type", compression) props.put("send.buffer.bytes", producerConfig.socketBuffer.toString) props.put("metadata.fetch.backoff.ms", producerConfig.retryBackoffMs.toString) props.put("metadata.expiry.ms", producerConfig.metadataExpiryMs.toString) diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala new file mode 100644 index 0000000..1d73aca --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api.test + +import java.util.{Properties, Collection, ArrayList} + +import org.scalatest.junit.JUnit3Suite +import org.junit.runners.Parameterized +import org.junit.runner.RunWith +import org.junit.runners.Parameterized.Parameters +import org.junit.{After, Before, Test} +import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig} +import org.junit.Assert._ + +import kafka.api.FetchRequestBuilder +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.consumer.SimpleConsumer +import kafka.message.Message +import kafka.zk.ZooKeeperTestHarness +import kafka.utils.{Utils, TestUtils} + +import scala.Array + + +@RunWith(value = classOf[Parameterized]) +class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooKeeperTestHarness { + private val brokerId = 0 + private val port = TestUtils.choosePort + private var server: KafkaServer = null + + private val props = TestUtils.createBrokerConfig(brokerId, port) + private val config = new KafkaConfig(props) + + private val topic = "topic" + private val numRecords = 100 + + @Before + override def setUp() { + super.setUp() + server = TestUtils.createServer(config) + } + + @After + override def tearDown() { + server.shutdown + Utils.rm(server.config.logDirs) + super.tearDown() + } + + /** + * testCompression + * + * Compressed messages should be able to sent and consumed correctly + */ + @Test + def testCompression() { + + val props = new Properties() + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config))) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) + var producer = new KafkaProducer(props) + + try { + // create topic + TestUtils.createTopic(zkClient, topic, 1, 1, List(server)) + val partition = 0 + + // prepare the messages + val messages = for (i <-0 until numRecords) + yield ("value" + i).getBytes + + // make sure the returned messages are correct + val responses = for (message <- messages) + yield producer.send(new ProducerRecord(topic, null, null, message)) + val futures = responses.toList + for ((future, offset) <- futures zip (0 until numRecords)) { + assertEquals(offset.toLong, future.get.offset) + } + + // make sure the fetched message count match + val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "") + val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) + val messageSet = fetchResponse.messageSet(topic, partition).iterator.toBuffer + assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet.size) + + var index = 0 + for (message <- messages) { + assertEquals(new Message(bytes = message), messageSet(index).message) + assertEquals(index.toLong, messageSet(index).offset) + index += 1 + } + } finally { + if (producer != null) { + producer.close() + producer = null + } + } + } +} + +object ProducerCompressionTest { + + // NOTE: Must return collection of Array[AnyRef] (NOT Array[Any]). + @Parameters + def parameters: Collection[Array[String]] = { + val list = new ArrayList[Array[String]]() + list.add(Array("gzip")) + list.add(Array("snappy")) + list + } +} diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index c002f5e..525a060 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -319,7 +319,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.MAX_RETRIES_CONFIG, 10.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString) - val producer = new KafkaProducer(producerProps) override def doWork(): Unit = { @@ -335,5 +334,10 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness case e : Exception => failed = true } } + + override def shutdown(){ + super.shutdown() + producer.close + } } } \ No newline at end of file diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 66ea76b..3c37330 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -17,21 +17,20 @@ package kafka.api.test -import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{Utils, TestUtils} -import kafka.zk.ZooKeeperTestHarness -import kafka.consumer.SimpleConsumer -import kafka.api.FetchRequestBuilder -import kafka.message.Message +import java.util.Properties +import java.lang.{Integer, IllegalArgumentException} import org.apache.kafka.clients.producer._ - import org.scalatest.junit.JUnit3Suite import org.junit.Test import org.junit.Assert._ -import java.util.Properties -import java.lang.{Integer, IllegalArgumentException} +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{Utils, TestUtils} +import kafka.zk.ZooKeeperTestHarness +import kafka.consumer.SimpleConsumer +import kafka.api.FetchRequestBuilder +import kafka.message.Message class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -76,15 +75,10 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { super.tearDown() } - class PrintOffsetCallback extends Callback { + class CheckErrorCallback extends Callback { def onCompletion(metadata: RecordMetadata, exception: Exception) { if (exception != null) fail("Send callback returns the following exception", exception) - try { - System.out.println("The message we just sent is marked as [" + metadata.partition + "] : " + metadata.offset); - } catch { - case e: Throwable => fail("Should succeed sending the message", e) - } } } @@ -100,7 +94,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) var producer = new KafkaProducer(props) - val callback = new PrintOffsetCallback + val callback = new CheckErrorCallback try { // create topic diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index 3df0d13..9e4ebaf 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -212,6 +212,7 @@ object ProducerPerformance extends Logging { props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) props.put("request.retries", config.producerNumRetries.toString) props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) + props.put("compression.type", config.compressionCodec.name) val producer = new KafkaProducer(props) def send(topic: String, partition: Long, bytes: Array[Byte]) {