diff --git a/build.gradle b/build.gradle index 84fa0d6..12a533f 100644 --- a/build.gradle +++ b/build.gradle @@ -152,7 +152,6 @@ project(':core') { compile 'com.yammer.metrics:metrics-core:2.2.0' compile 'com.yammer.metrics:metrics-annotation:2.2.0' compile 'net.sf.jopt-simple:jopt-simple:3.2' - compile 'org.xerial.snappy:snappy-java:1.0.5' testCompile 'junit:junit:4.1' testCompile 'org.easymock:easymock:3.0' @@ -318,6 +317,8 @@ project(':clients') { dependencies { compile "org.slf4j:slf4j-api:1.7.6" + compile 'org.xerial.snappy:snappy-java:1.0.5' + testCompile 'com.novocode:junit-interface:0.9' testRuntime "$slf4jlog4j" } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 1ac6943..1ff9174 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -69,6 +69,7 @@ public class KafkaProducer implements Producer { private final Sender sender; private final Metrics metrics; private final Thread ioThread; + private final CompressionType compressionType; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -99,6 +100,7 @@ public class KafkaProducer implements Producer { config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG); + this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG), this.totalMemorySize, config.getLong(ProducerConfig.LINGER_MS_CONFIG), @@ -224,7 +226,7 @@ public class KafkaProducer implements Producer { ensureValidSize(record.key(), record.value()); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback); + FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), compressionType, callback); this.sender.wakeup(); return future; // For API exceptions return them in the future; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 32e12ad..48706ba 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -136,6 +136,11 @@ public class ProducerConfig extends AbstractConfig { public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; /** + * The compression type for all data generated. The default is none (i.e. no compression) + */ + public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; + + /** * Should we register the Kafka metrics as JMX mbeans? */ public static final String ENABLE_JMX_CONFIG = "enable.jmx"; @@ -158,9 +163,10 @@ public class ProducerConfig extends AbstractConfig { .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah") .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah") .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah") - .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "") .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "") - .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah"); + .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah") + .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", "blah blah") + .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, ""); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 616e100..6d86e6b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -28,8 +28,8 @@ import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.Record; import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -45,6 +45,10 @@ import org.slf4j.LoggerFactory; */ public final class RecordAccumulator { + // We need a minimum batch size to avoid the corner case where + // log-overhead + record-overhead + compression-header > batch size > un-compressed message size + private static final int MIN_BATCH_SIZE = 100; + private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class); private volatile boolean closed; @@ -107,10 +111,10 @@ public final class RecordAccumulator { metrics.addMetric("buffer_available_bytes", "The total amount of buffer memory that is available (not currently used for buffering records).", new Measurable() { - public double measure(MetricConfig config, long now) { - return free.availableMemory(); - } - }); + public double measure(MetricConfig config, long now) { + return free.availableMemory(); + } + }); } /** @@ -132,7 +136,7 @@ public final class RecordAccumulator { synchronized (dq) { RecordBatch batch = dq.peekLast(); if (batch != null) { - FutureRecordMetadata future = batch.tryAppend(key, value, compression, callback); + FutureRecordMetadata future = batch.tryAppend(key, value, callback); if (future != null) return future; } @@ -140,12 +144,13 @@ public final class RecordAccumulator { // we don't have an in-progress record batch try to allocate a new batch int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); + size = Math.max(size, MIN_BATCH_SIZE); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size); synchronized (dq) { RecordBatch first = dq.peekLast(); if (first != null) { - FutureRecordMetadata future = first.tryAppend(key, value, compression, callback); + FutureRecordMetadata future = first.tryAppend(key, value, callback); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen // often... @@ -153,8 +158,28 @@ public final class RecordAccumulator { return future; } } - RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds()); - FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, compression, callback)); + MemoryRecords records = new MemoryRecords(buffer, compression); + RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); + FutureRecordMetadata future = batch.tryAppend(key, value, callback); + + // when compression is enabled, it is possible that the compressed message size is larger than + // its uncompressed counterpart, causing the single compressed message to be bigger than the allocated + // buffer; under this case, we will re-build the batch as uncompressed version and re-append + if (future == null) { + if (compression == CompressionType.NONE) { + throw new IllegalStateException("Appending record to un-compressed batch failed."); + } else { + log.warn("Appending record with {} bytes to the compressed batch failed, " + + "re-appending to un-compressed batch instead", Record.recordSize(key, value)); + buffer.clear(); + records = new MemoryRecords(buffer, CompressionType.NONE); + batch = new RecordBatch(tp, records, time.milliseconds()); + future = Utils.notNull(batch.tryAppend(key, value, callback)); + // close the batch so that future messages will still be batched with compression + batch.records.close(); + } + } + dq.addLast(batch); return future; } @@ -196,8 +221,10 @@ public final class RecordAccumulator { boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining(); boolean expired = now - batch.created >= lingerMs; boolean sendable = full || expired || exhausted || closed; - if (sendable && !backingOff) + if (sendable && !backingOff) { + batch.records.close(); ready.add(batch.topicPartition); + } } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 038a05a..20fc9a0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -17,7 +17,6 @@ import java.util.List; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,11 +53,13 @@ public final class RecordBatch { * * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ - public FutureRecordMetadata tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) { + public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) { if (!this.records.hasRoomFor(key, value)) { return null; } else { - this.records.append(0L, key, value, compression); + if (!this.records.append(0L, key, value)) + return null; + FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) thunks.add(new Thunk(callback, future)); @@ -71,7 +72,7 @@ public final class RecordBatch { * Complete the request * * @param baseOffset The base offset of the messages assigned by the server - * @param errorCode The error code or 0 if no error + * @param exception The exception returned or null if no exception */ public void done(long baseOffset, RuntimeException exception) { this.produceFuture.done(topicPartition, baseOffset, exception); diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index 3ebbb80..3010a6e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -30,7 +30,7 @@ public class ProducerPerformance { public static void main(String[] args) throws Exception { if (args.length != 5) { - System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks"); + System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks [compression_type]"); System.exit(1); } String url = args[0]; @@ -45,6 +45,8 @@ public class ProducerPerformance { props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE)); props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(256 * 1024 * 1024)); props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(256 * 1024)); + if (args.length == 6) + props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]); KafkaProducer producer = new KafkaProducer(props); Callback callback = new Callback() { diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java new file mode 100644 index 0000000..f7577dc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * A byte buffer backed input outputStream + */ +public class ByteBufferInputStream extends InputStream { + + ByteBuffer buffer; + + public ByteBufferInputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + public int read() throws IOException { + if (!buffer.hasRemaining()) { + return -1; + } + return buffer.get() & 0xFF; + } + + public int read(byte[] bytes, int off, int len) throws IOException { + if (!buffer.hasRemaining()) { + return -1; + } + + len = Math.min(len, buffer.remaining()); + buffer.get(bytes, off, len); + return len; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java new file mode 100644 index 0000000..8cf759c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + + +/** + * A byte buffer backed output outputStream + */ +public class ByteBufferOutputStream extends OutputStream { + + ByteBuffer buffer; + + public ByteBufferOutputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + public void write(int b) throws IOException { + buffer.put((byte) b); + } + + public void write(byte[] bytes, int off, int len) throws IOException { + buffer.put(bytes, off, len); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 906da02..801ad51 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -16,6 +16,13 @@ */ package org.apache.kafka.common.record; +import java.io.*; +import java.nio.ByteBuffer; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import org.apache.kafka.common.KafkaException; + /** * The compression type to use */ @@ -30,6 +37,68 @@ public enum CompressionType { this.name = name; } + public DataOutputStream wrapForOutput(ByteBuffer buffer) { + try { + switch (id) { + case 0: + return new DataOutputStream( + new ByteBufferOutputStream(buffer) + ); + case 1: + return new DataOutputStream( + new GZIPOutputStream( + new ByteBufferOutputStream(buffer) + )); + case 2: + // dynamically load the snappy class to avoid runtime dependency + // on snappy if we are not using it + try { + Class SnappyOutputStream = Class.forName("org.xerial.snappy.SnappyOutputStream"); + OutputStream stream = (OutputStream) SnappyOutputStream.getConstructor(OutputStream.class) + .newInstance(new ByteBufferOutputStream(buffer)); + return new DataOutputStream(stream); + } catch (Exception e) { + throw new KafkaException(e); + } + default: + throw new IllegalArgumentException("Unknown compression type id: " + id); + } + } catch (IOException e) { + throw new KafkaException(e); + } + } + + public DataInputStream wrapForInput(ByteBuffer buffer) { + try { + switch (id) { + case 0: + return new DataInputStream( + new ByteBufferInputStream(buffer) + ); + case 1: + return new DataInputStream( + new GZIPInputStream( + new ByteBufferInputStream(buffer) + )); + case 2: + ClassLoader classLoader = ClassLoader.class.getClassLoader(); + + try { + Class SnappyInputStream = Class.forName("org.xerial.snappy.SnappyInputStream"); + InputStream stream = (InputStream) SnappyInputStream.getConstructor(InputStream.class) + .newInstance(new ByteBufferInputStream(buffer)); + return new DataInputStream(stream); + } catch (Exception e) { + throw new KafkaException(e); + } + default: + throw new IllegalArgumentException("Unknown compression type id: " + id); + } + } catch (IOException e) { + throw new KafkaException(e); + } + } + public static CompressionType forId(int id) { switch (id) { case 0: @@ -53,4 +122,5 @@ public enum CompressionType { else throw new IllegalArgumentException("Unknown compression name: " + name); } + } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java new file mode 100644 index 0000000..ee88fb1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + + + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; + +public class Compressor { + + private static final Logger log = LoggerFactory.getLogger(MemoryRecords.class); + + private final int initialPos; + private final ByteBuffer buffer; + private final CompressionType type; + private long numRecords; + private DataOutputStream appendStream; + + public Compressor(ByteBuffer buffer, CompressionType type) { + this.buffer = buffer; + this.type = type; + this.initialPos = buffer.position(); + } + + // when the compressed message size is bigger than its uncompressed counterpart, + // the underlying byte buffer may throw a BufferOverflowException, + // we need to catch this and return false indicating append failure + public void maybeInit() { + if (appendStream != null) + return; + + if (type != CompressionType.NONE) { + // for compressed records, first write the place-holder header + this.buffer.position(initialPos + Records.LOG_OVERHEAD); + if(!Record.write(this.buffer, null, null, type, 0, -1)) + throw new IllegalStateException("Error when initializing the compressor for" + type.name() + " compressed records"); + // move the starting position for value payload + this.buffer.position(initialPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD); + } + + // create the stream + appendStream = type.wrapForOutput(this.buffer); + } + + public void close() { + if (appendStream == null) + throw new IllegalStateException("The append stream has never been initialized before it is closed"); + + try { + appendStream.close(); + } catch (IOException e) { + throw new KafkaException(e); + } + + if (type != CompressionType.NONE) { + // wrap compressed messages as a single shallow message + int pos = buffer.position(); + // compute the fill the value size + int valueSize = pos - initialPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD; + buffer.putInt(initialPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET, valueSize); + // compute and fill the crc at the beginning of the message + long crc = Record.computeChecksum(buffer, + initialPos + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET, + pos - initialPos - Records.LOG_OVERHEAD - Record.MAGIC_OFFSET); + Utils.writeUnsignedInt(buffer, initialPos + Records.LOG_OVERHEAD + Record.CRC_OFFSET, crc); + // write the header, for the end offset write as number of records - 1 + buffer.position(initialPos); + buffer.putLong(numRecords - 1); + buffer.putInt(pos - initialPos - Records.LOG_OVERHEAD); + buffer.position(pos); + } + } + + public boolean putLong(final long value) { + maybeInit(); + int pos = buffer.position(); + try { + appendStream.writeLong(value); + } catch (BufferOverflowException e) { + log.warn("Buffer overflow in writing long value {}, rewinding the partial write.", value); + this.buffer.position(pos); + return false; + } catch (IOException e) { + log.warn("I/O exception in writing long value {}, rewinding the partial write.", value); + this.buffer.position(pos); + return false; + } + return true; + } + + public boolean putInt(final int value) { + maybeInit(); + int pos = buffer.position(); + try { + appendStream.writeInt(value); + } catch (BufferOverflowException e) { + log.warn("Buffer overflow in writing int value {}, rewinding the partial write.", value); + this.buffer.position(pos); + return false; + } catch (IOException e) { + log.warn("I/O exception in writing int value {}, rewinding the partial write.", value); + this.buffer.position(pos); + return false; + } + return true; + } + + public boolean put(final ByteBuffer buffer) { + maybeInit(); + int pos = buffer.position(); + try { + appendStream.write(buffer.array(), buffer.arrayOffset(), buffer.limit()); + } catch (BufferOverflowException e) { + log.warn("Buffer overflow in writing byte buffer {}, rewinding the partial write.", buffer); + this.buffer.position(pos); + return false; + } catch (IOException e) { + log.warn("I/O exception in writing byte buffer {}, rewinding the partial write.", buffer); + this.buffer.position(pos); + return false; + } + return true; + } + + public boolean putByte(final byte value) { + maybeInit(); + int pos = buffer.position(); + try { + appendStream.writeByte(value); + } catch (BufferOverflowException e) { + log.warn("Buffer overflow in writing byte {}, rewinding the partial write.", value); + this.buffer.position(pos); + return false; + } catch (IOException e) { + log.warn("I/O exception in writing byte {}, rewinding the partial write.", value); + this.buffer.position(pos); + return false; + } + return true; + } + + public boolean put(final byte[] bytes, final int offset, final int len) { + maybeInit(); + int pos = buffer.position(); + try { + appendStream.write(bytes, offset, len); + } catch (BufferOverflowException e) { + log.warn("Buffer overflow in writing {} bytes, rewinding the partial write.", len); + this.buffer.position(pos); + return false; + } catch (IOException e) { + log.warn("I/O exception in writing {} bytes, rewinding the partial write.", len); + this.buffer.position(pos); + return false; + } + return true; + } + + public boolean putRecord(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { + maybeInit(); + // put a record as un-compressed into the underlying stream + long crc = Record.computeChecksum(key, value, type, valueOffset, valueSize); + byte attributes = Record.computeAttributes(type); + return putRecord(crc, attributes, key, value, valueOffset, valueSize); + } + + public boolean putRecord(byte[] key, byte[] value) { + return putRecord(key, value, CompressionType.NONE, 0, -1); + } + + private boolean putRecord(final long crc, final byte attributes, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) { + int pos = buffer.position(); + boolean ret = Record.write(this, crc, attributes, key, value, valueOffset, valueSize); + if (!ret) { + log.warn("Error in writing record, rewinding the partial write."); + this.buffer.position(pos); + } + return ret; + } + + public void written() { + numRecords++; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 9d8935f..f973cab 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -16,11 +16,15 @@ */ package org.apache.kafka.common.record; +import java.io.DataInputStream; +import java.io.EOFException; import java.io.IOException; +import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; import java.util.Iterator; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.AbstractIterator; @@ -30,39 +34,62 @@ import org.apache.kafka.common.utils.AbstractIterator; public class MemoryRecords implements Records { private final ByteBuffer buffer; + private final Compressor compressor; + private boolean closed; public MemoryRecords(int size) { - this(ByteBuffer.allocate(size)); + this(ByteBuffer.allocate(size), CompressionType.NONE); } - public MemoryRecords(ByteBuffer buffer) { + public MemoryRecords(ByteBuffer buffer, CompressionType type) { this.buffer = buffer; + this.closed = false; + this.compressor = new Compressor(buffer, type); } /** - * Append the given record and offset to the buffer + * Try to append the given record and offset to the buffer + * @return true if append succeeds, false otherwise */ - public void append(long offset, Record record) { - buffer.putLong(offset); - buffer.putInt(record.size()); - buffer.put(record.buffer()); + public boolean append(long offset, Record record) { + boolean written = true; + written = written && compressor.putLong(offset); + written = written && compressor.putInt(record.size()); + written = written && compressor.put(record.buffer()); record.buffer().rewind(); + if (written) compressor.written(); + return written; } /** - * Append a new record and offset to the buffer + * Try to append a new record and offset to the buffer + * @return true if append succeeds, false otherwise */ - public void append(long offset, byte[] key, byte[] value, CompressionType type) { - buffer.putLong(offset); - buffer.putInt(Record.recordSize(key, value)); - Record.write(this.buffer, key, value, type); + public boolean append(long offset, byte[] key, byte[] value) { + boolean written = true; + written = written && compressor.putLong(offset); + written = written && compressor.putInt(Record.recordSize(key, value)); + written = written && compressor.putRecord(key, value); + if (written) compressor.written(); + return written; } /** * Check if we have room for a new record containing the given key/value pair + * + * Note that even if this function returns true, the following append may still fail since + * the space is calculated assuming no-compression used. */ public boolean hasRoomFor(byte[] key, byte[] value) { - return this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value); + return !this.closed && this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value); + } + + /** + * Close this batch for no more appends + */ + public void close() { + compressor.close(); + this.closed = true; } /** Write the records in this set to the given channel */ @@ -86,34 +113,77 @@ public class MemoryRecords implements Records { @Override public Iterator iterator() { - return new RecordsIterator(this.buffer); + ByteBuffer copy = (ByteBuffer) this.buffer.duplicate().flip(); + return new RecordsIterator(copy, CompressionType.NONE, false); } - /* TODO: allow reuse of the buffer used for iteration */ public static class RecordsIterator extends AbstractIterator { private final ByteBuffer buffer; - - public RecordsIterator(ByteBuffer buffer) { - ByteBuffer copy = buffer.duplicate(); - copy.flip(); - this.buffer = copy; + private final DataInputStream stream; + private final CompressionType type; + private final boolean shallow; + private RecordsIterator innerIter; + + public RecordsIterator(ByteBuffer buffer, CompressionType type, boolean shallow) { + this.type = type; + this.buffer = buffer; + this.shallow = shallow; + stream = type.wrapForInput(this.buffer); } + /* + * Read the next record from the buffer, note that we cannot depend on remaining + * due to compression + */ @Override protected LogEntry makeNext() { - if (buffer.remaining() < Records.LOG_OVERHEAD) - return allDone(); - long offset = buffer.getLong(); - int size = buffer.getInt(); - if (size < 0) - throw new IllegalStateException("Record with size " + size); - if (buffer.remaining() < size) - return allDone(); - ByteBuffer rec = buffer.slice(); - rec.limit(size); - this.buffer.position(this.buffer.position() + size); - return new LogEntry(offset, new Record(rec)); + if (innerDone()) { + try { + // read the offset and record size as log overhead + long offset = stream.readLong(); + int size = stream.readInt(); + if (size < 0) + throw new IllegalStateException("Record with size " + size); + // read the record, if compression is used we cannot depend on size + // and hence has to do extra copy + ByteBuffer rec; + if (type == CompressionType.NONE) { + rec = buffer.slice(); + buffer.position(buffer.position() + size); + } else { + byte[] recordBuffer = new byte[size]; + stream.read(recordBuffer, 0, size); + rec = ByteBuffer.wrap(recordBuffer); + } + rec.limit(size); + LogEntry entry = new LogEntry(offset, new Record(rec)); + entry.record().ensureValid(); + + // decide whether to go shallow or deep iteration if it is compressed + CompressionType compression = entry.record().compressionType(); + if (compression == CompressionType.NONE || shallow) { + return entry; + } else { + // init the inner iterator with the value payload of the message + ByteBuffer value = entry.record().value(); + innerIter = new RecordsIterator(value, compression, true); + // the new compressed records should at least has one message + return innerIter.next(); + } + } catch (BufferUnderflowException e) { + throw new IllegalStateException("Invalid compressed record"); + } catch (EOFException e) { + return allDone(); + } catch (IOException e) { + throw new KafkaException(e); + } + } else { + return innerIter.next(); + } } - } + private boolean innerDone() { + return (innerIter == null || !innerIter.hasNext()); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index f1dc977..1ed4fde 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.record; import java.nio.ByteBuffer; +import org.apache.kafka.common.utils.Crc32; import org.apache.kafka.common.utils.Utils; @@ -40,13 +41,15 @@ public final class Record { public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH; public static final int VALUE_SIZE_LENGTH = 4; - /** The amount of overhead bytes in a record */ - public static final int RECORD_OVERHEAD = KEY_OFFSET + VALUE_SIZE_LENGTH; + /** + * The size for the record header + */ + public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH; /** - * The minimum valid size for the record header + * The amount of overhead bytes in a record */ - public static final int MIN_HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; + public static final int RECORD_OVERHEAD = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; /** * The current "magic" value @@ -71,27 +74,30 @@ public final class Record { } /** - * A constructor to create a LogRecord + * A constructor to create a LogRecord. If the record's compression type is not none, then + * its value payload should be already compressed with the specified type; the constructor + * would always write the value payload as is and will not do the compression itself. * * @param key The key of the record (null, if none) * @param value The record value - * @param codec The compression codec used on the contents of the record (if any) + * @param type The compression type used on the contents of the record (if any) * @param valueOffset The offset into the payload array used to extract payload * @param valueSize The size of the payload to use */ - public Record(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { - this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, value == null ? 0 : valueSize >= 0 ? valueSize - : value.length - valueOffset))); - write(this.buffer, key, value, codec, valueOffset, valueSize); + public Record(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { + this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, + value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset))); + if (!write(this.buffer, key, value, type, valueOffset, valueSize)) + throw new IllegalStateException("Error when generating the record"); this.buffer.rewind(); } - public Record(byte[] key, byte[] value, CompressionType codec) { - this(key, value, codec, 0, -1); + public Record(byte[] key, byte[] value, CompressionType type) { + this(key, value, type, 0, -1); } - public Record(byte[] value, CompressionType codec) { - this(null, value, codec); + public Record(byte[] value, CompressionType type) { + this(null, value, type); } public Record(byte[] key, byte[] value) { @@ -102,40 +108,39 @@ public final class Record { this(null, value, CompressionType.NONE); } - public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { - // skip crc, we will fill that in at the end - int pos = buffer.position(); - buffer.position(pos + MAGIC_OFFSET); - buffer.put(CURRENT_MAGIC_VALUE); - byte attributes = 0; - if (codec.id > 0) - attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id)); - buffer.put(attributes); + // Write a record to the buffer, if the record's compression type is none, then + // its value payload should be already compressed with the specified type + public static boolean write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { + // construct the compressor with compression type none since this function will not do any + //compression according to the input type, it will just write the record's payload as is + Compressor compressor = new Compressor(buffer, CompressionType.NONE); + return compressor.putRecord(key, value, type, valueOffset, valueSize); + } + + public static boolean write(Compressor compressor, long crc, byte attributes, byte[] key, byte[] value, int valueOffset, int valueSize) { + boolean written = true; + // write crc + written = written && compressor.putInt((int) (crc & 0xffffffffL)); + // write magic value + written = written && compressor.putByte(CURRENT_MAGIC_VALUE); + // write attributes + written = written && compressor.putByte(attributes); // write the key if (key == null) { - buffer.putInt(-1); + written = written && compressor.putInt(-1); } else { - buffer.putInt(key.length); - buffer.put(key, 0, key.length); + written = written && compressor.putInt(key.length); + written = written && compressor.put(key, 0, key.length); } // write the value if (value == null) { - buffer.putInt(-1); + written = written && compressor.putInt(-1); } else { int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); - buffer.putInt(size); - buffer.put(value, valueOffset, size); + written = written && compressor.putInt(size); + written = written && compressor.put(value, valueOffset, size); } - - // now compute the checksum and fill it in - long crc = computeChecksum(buffer, - buffer.arrayOffset() + pos + MAGIC_OFFSET, - buffer.position() - pos - MAGIC_OFFSET - buffer.arrayOffset()); - Utils.writeUnsignedInt(buffer, pos + CRC_OFFSET, crc); - } - - public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec) { - write(buffer, key, value, codec, 0, -1); + return written; } public static int recordSize(byte[] key, byte[] value) { @@ -150,13 +155,52 @@ public final class Record { return this.buffer; } + public static byte computeAttributes(CompressionType type) { + byte attributes = 0; + if (type.id > 0) + attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id)); + return attributes; + } + /** * Compute the checksum of the record from the record contents */ public static long computeChecksum(ByteBuffer buffer, int position, int size) { - return Utils.crc32(buffer.array(), buffer.arrayOffset() + position, size - buffer.arrayOffset()); + Crc32 crc = new Crc32(); + crc.update(buffer.array(), buffer.arrayOffset() + position, size); + return crc.getValue(); + } + + /** + * Compute the checksum of the record from the attributes, key and value payloads + */ + public static long computeChecksum(byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize) { + // TODO: we can remove this duplicate logic when we change the message format to put crc at the end + Crc32 crc = new Crc32(); + crc.update(CURRENT_MAGIC_VALUE); + byte attributes = 0; + if (type.id > 0) + attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id)); + crc.update(attributes); + // update for the key + if (key == null) { + Utils.updateCrc32(crc, -1); + } else { + Utils.updateCrc32(crc, key.length); + crc.update(key, 0, key.length); + } + // update for the value + if (value == null) { + Utils.updateCrc32(crc, -1); + } else { + int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); + Utils.updateCrc32(crc, size); + crc.update(value, valueOffset, size); + } + return crc.getValue(); } + /** * Compute the checksum of the record from the record contents */ @@ -239,7 +283,7 @@ public final class Record { } /** - * The compression codec used with this record + * The compression type used with this record */ public CompressionType compressionType() { return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK); diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 0c6b365..77d3a96 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -102,8 +102,8 @@ public class Utils { } /** - * Compute the CRC32 of the segment of the byte array given by the specificed size and offset - * + * Compute the CRC32 of record given the key and value payload + * * @param bytes The bytes to checksum * @param offset the offset at which to begin checksumming * @param size the number of bytes to checksum @@ -116,6 +116,16 @@ public class Utils { } /** + * Update the CRC32 given an integer + */ + public static void updateCrc32(Crc32 crc, int input) { + crc.update((byte) (input >> 24)); + crc.update((byte) (input >> 16)); + crc.update((byte) (input >> 8)); + crc.update((byte) input /* >> 0 */); + } + + /** * Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). */ diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index b0745b5..fbf4361 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -26,25 +26,24 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -import org.apache.kafka.common.record.LogEntry; -import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.Record; import org.junit.Test; public class MemoryRecordsTest { @Test public void testIterator() { - MemoryRecords recs1 = new MemoryRecords(ByteBuffer.allocate(1024)); - MemoryRecords recs2 = new MemoryRecords(ByteBuffer.allocate(1024)); + MemoryRecords recs1 = new MemoryRecords(1024); + MemoryRecords recs2 = new MemoryRecords(1024); List list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()), new Record("b".getBytes(), "2".getBytes()), new Record("c".getBytes(), "3".getBytes())); for (int i = 0; i < list.size(); i++) { Record r = list.get(i); recs1.append(i, r); - recs2.append(i, toArray(r.key()), toArray(r.value()), r.compressionType()); + recs2.append(i, toArray(r.key()), toArray(r.value())); } + recs1.close(); + recs2.close(); for (int iteration = 0; iteration < 2; iteration++) { for (MemoryRecords recs : Arrays.asList(recs1, recs2)) { @@ -54,10 +53,38 @@ public class MemoryRecordsTest { LogEntry entry = iter.next(); assertEquals((long) i, entry.offset()); assertEquals(list.get(i), entry.record()); + entry.record().ensureValid(); } assertFalse(iter.hasNext()); } } } + @Test + public void testCompressedIterator() { + MemoryRecords recs1 = new MemoryRecords(ByteBuffer.allocate(1024), CompressionType.GZIP); + MemoryRecords recs2 = new MemoryRecords(ByteBuffer.allocate(1024), CompressionType.SNAPPY); + List list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()), + new Record("b".getBytes(), "2".getBytes()), + new Record("c".getBytes(), "3".getBytes())); + for (int i = 0; i < list.size(); i++) { + Record r = list.get(i); + recs1.append(i, r); + recs2.append(i, r); + } + recs1.close(); + recs2.close(); + + for (MemoryRecords recs : Arrays.asList(recs1, recs2)) { + Iterator iter = recs.iterator(); + for (int i = 0; i < list.size(); i++) { + assertTrue(iter.hasNext()); + LogEntry entry = iter.next(); + assertEquals((long) i, entry.offset()); + assertEquals(list.get(i), entry.record()); + entry.record().ensureValid(); + } + assertFalse(iter.hasNext()); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java index ae54d67..b3c6c5f 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java @@ -27,9 +27,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.InvalidRecordException; -import org.apache.kafka.common.record.Record; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -39,12 +36,16 @@ import org.junit.runners.Parameterized.Parameters; public class RecordTest { private ByteBuffer key; + private byte[] keyArray; private ByteBuffer value; + private byte[] valueArray; private CompressionType compression; private Record record; public RecordTest(byte[] key, byte[] value, CompressionType compression) { + this.keyArray = key; this.key = key == null ? null : ByteBuffer.wrap(key); + this.valueArray = value; this.value = value == null ? null : ByteBuffer.wrap(value); this.compression = compression; this.record = new Record(key, value, compression); @@ -66,6 +67,8 @@ public class RecordTest { @Test public void testChecksum() { assertEquals(record.checksum(), record.computeChecksum()); + assertEquals(record.checksum(), record.computeChecksum( + this.keyArray, this.valueArray, this.compression, 0, -1)); assertTrue(record.isValid()); for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) { Record copy = copyOf(record); @@ -95,9 +98,11 @@ public class RecordTest { @Parameters public static Collection data() { + byte[] payload = new byte[1000]; + Arrays.fill(payload, (byte) 1); List values = new ArrayList(); - for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes())) - for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes())) + for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload)) + for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload)) for (CompressionType compression : CompressionType.values()) values.add(new Object[] { key, value, compression }); return values; diff --git a/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java new file mode 100644 index 0000000..9f5335e --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +import java.nio.ByteBuffer; + +public class CrcTest { + + @Test + public void testUpdate() { + final byte bytes[] = "Any String you want".getBytes(); + final int len = bytes.length; + + Crc32 crc1 = new Crc32(); + Crc32 crc2 = new Crc32(); + Crc32 crc3 = new Crc32(); + + crc1.update(bytes, 0, len); + for(int i = 0; i < len; i++) + crc2.update(bytes[i]); + crc3.update(bytes, 0, len/2); + crc3.update(bytes, len/2, len-len/2); + + assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue()); + assertEquals("Crc values should be the same", crc1.getValue(), crc3.getValue()); + } + + @Test + public void testUpdateInt() { + final int value = 1000; + final ByteBuffer buffer = ByteBuffer.allocate(4); + buffer.putInt(value); + + Crc32 crc1 = new Crc32(); + Crc32 crc2 = new Crc32(); + + Utils.updateCrc32(crc1, value); + crc2.update(buffer.array(), buffer.arrayOffset(), 4); + + assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue()); + } +} diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 36cfc0f..76a17e8 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -88,7 +88,7 @@ public class TestUtils { /** * Generate an array of random bytes * - * @param numBytes The size of the array + * @param size The size of the array */ public static byte[] randomBytes(int size) { byte[] bytes = new byte[size]; diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index dd39ff2..97927b6 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -255,8 +255,8 @@ object ConsoleProducer { class NewShinyProducer(producerConfig: ProducerConfig) extends Producer { val props = new Properties() props.put("metadata.broker.list", producerConfig.brokerList) - val codec = if(producerConfig.compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec - props.put("compression.codec", codec.toString) + val codec = if(producerConfig.compress) DefaultCompressionCodec.name else NoCompressionCodec.name + props.put("compression.type", codec.toString) props.put("send.buffer.bytes", producerConfig.socketBuffer.toString) props.put("metadata.fetch.backoff.ms", producerConfig.retryBackoffMs.toString) props.put("metadata.expiry.ms", producerConfig.metadataExpiryMs.toString) diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala new file mode 100644 index 0000000..33917a9 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api.test + +import kafka.zk.ZooKeeperTestHarness + +import scala.Array + +import java.util.{Properties, Collection, ArrayList} + +import org.scalatest.junit.JUnit3Suite +import org.junit.runners.Parameterized +import org.junit.runner.RunWith +import org.junit.runners.Parameterized.Parameters +import org.junit.{After, Before, Test} +import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig} +import kafka.utils.{Utils, TestUtils} +import org.junit.Assert._ +import kafka.api.FetchRequestBuilder +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.consumer.SimpleConsumer +import kafka.message.Message + +@RunWith(value = classOf[Parameterized]) +class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooKeeperTestHarness { + private val brokerId1 = 0 + private val brokerId2 = 1 + private val ports = TestUtils.choosePorts(2) + private val (port1, port2) = (ports(0), ports(1)) + private var server1: KafkaServer = null + private var server2: KafkaServer = null + private var servers = List.empty[KafkaServer] + + private var consumer1: SimpleConsumer = null + private var consumer2: SimpleConsumer = null + + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2) + props1.put("num.partitions", "4") + props2.put("num.partitions", "4") + private val config1 = new KafkaConfig(props1) + private val config2 = new KafkaConfig(props2) + + private val topic = "topic" + private val numRecords = 100 + + @Before + override def setUp() { + super.setUp() + // set up 2 brokers with 4 partitions each + server1 = TestUtils.createServer(config1) + server2 = TestUtils.createServer(config2) + servers = List(server1,server2) + + // TODO: we need to migrate to new consumers when 0.9 is final + consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "") + } + + @After + override def tearDown() { + server1.shutdown + server2.shutdown + Utils.rm(server1.config.logDirs) + Utils.rm(server2.config.logDirs) + super.tearDown() + } + + /** + * testCompressionNormal + * + * Compressed messages should be able to sent and consumed correctly + */ + @Test + def testCompressionNormal() { + + val props = new Properties() + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) + var producer = new KafkaProducer(props) + + try { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic, 1, 1, servers) + val partition = 0 + + // make sure leaders exist + val leader = leaders(partition) + assertTrue("Leader for topic \"topic\" partition 1 should exist", leader.isDefined) + + // make sure the returned messages are correct + val responses = for (i <- 0 until numRecords) + yield producer.send(new ProducerRecord(topic, null, null, ("value" + i).getBytes)) + val futures = responses.toList + for ((future, offset) <- futures zip (0 until numRecords)) { + assertEquals(offset.toLong, future.get.offset) + } + + // make sure the fetched message count match + val fetchResponse = if(leader.get == server1.config.brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) + } + val messageSet = fetchResponse.messageSet(topic, partition).iterator.toBuffer + assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet.size) + + for (i <- 0 until numRecords) { + assertEquals(new Message(bytes = ("value" + i).getBytes), messageSet(i).message) + assertEquals(i.toLong, messageSet(i).offset) + } + } finally { + if (producer != null) { + producer.close() + producer = null + } + } + } + + /** + * testCompressionOverflow + * + * If the compressed message cause overflow in writing data, the sending should + * retry with non-compression and succeed + */ + @Test + def testCompressionOverflow() { + + val props = new Properties() + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) + props.put(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, 50.toString) + var producer = new KafkaProducer(props) + + try { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic, 1, 1, servers) + val partition = 0 + + // make sure leaders exist + val leader = leaders(partition) + assertTrue("Leader for topic \"topic\" partition 1 should exist", leader.isDefined) + + // let the producer with small batch size to send a few messages with no shared contents, + // the compressed size will be larger than the original size + val fewRecords = 3 + producer.send(new ProducerRecord(topic, null, "a".getBytes, ("1").getBytes)) + producer.send(new ProducerRecord(topic, null, "b".getBytes, ("2").getBytes)) + producer.send(new ProducerRecord(topic, null, "c".getBytes, ("3").getBytes)).get + + // make sure the fetched message count match + val fetchResponse = if(leader.get == server1.config.brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) + } + val messageSet = fetchResponse.messageSet(topic, partition).iterator.toBuffer + assertEquals("Should have fetched " + fewRecords + " messages", fewRecords, messageSet.size) + + assertEquals(new Message("1".getBytes, "a".getBytes), messageSet(0).message) + assertEquals(new Message("2".getBytes, "b".getBytes), messageSet(1).message) + assertEquals(new Message("3".getBytes, "c".getBytes), messageSet(2).message) + + for (i <- 0 until fewRecords) + assertEquals(i.toLong, messageSet(i).offset) + } finally { + if (producer != null) { + producer.close() + producer = null + } + } + } +} + +object ProducerCompressionTest { + + // NOTE: Must return collection of Array[AnyRef] (NOT Array[Any]). + @Parameters + def parameters: Collection[Array[String]] = { + val list = new ArrayList[Array[String]]() + list.add(Array("gzip")) + list.add(Array("snappy")) + list + } +} diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index c002f5e..525a060 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -319,7 +319,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.MAX_RETRIES_CONFIG, 10.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString) - val producer = new KafkaProducer(producerProps) override def doWork(): Unit = { @@ -335,5 +334,10 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness case e : Exception => failed = true } } + + override def shutdown(){ + super.shutdown() + producer.close + } } } \ No newline at end of file diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 66ea76b..4e89f80 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -30,8 +30,9 @@ import org.scalatest.junit.JUnit3Suite import org.junit.Test import org.junit.Assert._ -import java.util.Properties +import java.util.{Arrays, Properties} import java.lang.{Integer, IllegalArgumentException} +import org.apache.kafka.common.record.Record class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -76,15 +77,10 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { super.tearDown() } - class PrintOffsetCallback extends Callback { + class CheckErrorCallback extends Callback { def onCompletion(metadata: RecordMetadata, exception: Exception) { if (exception != null) fail("Send callback returns the following exception", exception) - try { - System.out.println("The message we just sent is marked as [" + metadata.partition + "] : " + metadata.offset); - } catch { - case e: Throwable => fail("Should succeed sending the message", e) - } } } @@ -100,7 +96,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) var producer = new KafkaProducer(props) - val callback = new PrintOffsetCallback + val callback = new CheckErrorCallback try { // create topic