diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index e4bc972..c19b499 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -64,6 +64,7 @@ public class KafkaProducer implements Producer { private final Sender sender; private final Metrics metrics; private final Thread ioThread; + private final CompressionType compressionType; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -93,6 +94,7 @@ public class KafkaProducer implements Producer { config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG); + this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG), this.totalMemorySize, config.getLong(ProducerConfig.LINGER_MS_CONFIG), @@ -214,7 +216,7 @@ public class KafkaProducer implements Producer { int partition = partitioner.partition(record, cluster); ensureValidSize(record.key(), record.value()); TopicPartition tp = new TopicPartition(record.topic(), partition); - FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback); + FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), compressionType, callback); this.sender.wakeup(); return future; } catch (Exception e) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index d8e35e7..69c729e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -136,6 +136,11 @@ public class ProducerConfig extends AbstractConfig { public static final String MAX_RETRIES_CONFIG = "request.retries"; /** + * The compression type for all data generated. The default is none (i.e. no compression) + */ + public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; + + /** * Should we register the Kafka metrics as JMX mbeans? */ public static final String ENABLE_JMX_CONFIG = "enable.jmx"; @@ -159,8 +164,9 @@ public class ProducerConfig extends AbstractConfig { .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah") .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah") .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah") - .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "") - .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), ""); + .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "") + .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", "blah blah") + .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, ""); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ce5cf27..9f46dab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -26,14 +26,13 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.Record; -import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.*; import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import javax.print.attribute.standard.Compression; + /** * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} * instances to be sent to the server. @@ -91,12 +90,12 @@ public final class RecordAccumulator { } }); metrics.addMetric("buffer_available_bytes", - "The total amount of buffer memory that is available (not currently used for buffering records).", - new Measurable() { - public double measure(MetricConfig config, long now) { - return free.availableMemory(); - } - }); + "The total amount of buffer memory that is available (not currently used for buffering records).", + new Measurable() { + public double measure(MetricConfig config, long now) { + return free.availableMemory(); + } + }); } /** @@ -118,7 +117,7 @@ public final class RecordAccumulator { synchronized (dq) { RecordBatch batch = dq.peekLast(); if (batch != null) { - FutureRecordMetadata future = batch.tryAppend(key, value, compression, callback); + FutureRecordMetadata future = batch.tryAppend(key, value, callback); if (future != null) return future; } @@ -130,7 +129,7 @@ public final class RecordAccumulator { synchronized (dq) { RecordBatch first = dq.peekLast(); if (first != null) { - FutureRecordMetadata future = first.tryAppend(key, value, compression, callback); + FutureRecordMetadata future = first.tryAppend(key, value, callback); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen // often... @@ -138,8 +137,23 @@ public final class RecordAccumulator { return future; } } - RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds()); - FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, compression, callback)); + MemoryRecords records = new MemoryRecords(buffer, compression); + RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); + FutureRecordMetadata future = batch.tryAppend(key, value, callback); + + // when compression is enabled, it is possible that the compressed message size is larger than + // its uncompressed counterpart, causing the single compressed message to be bigger than the allocated + // buffer; under this case, we will re-build the batch as uncompressed version and re-append + if (future == null) { + if (compression == CompressionType.NONE) + throw new IllegalStateException("Append record failed even with enough room left in the memory buffer."); + // TODO: logging please + buffer.clear(); + records = new MemoryRecords(buffer, CompressionType.NONE); + batch = new RecordBatch(tp, records, time.milliseconds()); + future = Utils.notNull(batch.tryAppend(key, value, callback)); + } + dq.addLast(batch); return future; } @@ -178,8 +192,10 @@ public final class RecordAccumulator { if (batch != null) { boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining(); boolean expired = now - batch.created >= lingerMs; - if (full | expired | exhausted | closed) + if (full | expired | exhausted | closed) { + batch.records.ready(); ready.add(batch.topicPartition); + } } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index eb16f6d..033719c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -18,7 +18,6 @@ import java.util.List; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; /** @@ -48,11 +47,13 @@ public final class RecordBatch { * * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ - public FutureRecordMetadata tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) { + public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) { if (!this.records.hasRoomFor(key, value)) { return null; } else { - this.records.append(0L, key, value, compression); + if (!this.records.append(0L, key, value)) + return null; + FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) thunks.add(new Thunk(callback, this.recordCount)); @@ -65,7 +66,7 @@ public final class RecordBatch { * Complete the request * * @param offset The offset - * @param errorCode The error code or 0 if no error + * @param exception The exception returned or null if no exception */ public void done(long offset, RuntimeException exception) { this.produceFuture.done(topicPartition, offset, exception); diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java new file mode 100644 index 0000000..f7577dc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * A byte buffer backed input outputStream + */ +public class ByteBufferInputStream extends InputStream { + + ByteBuffer buffer; + + public ByteBufferInputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + public int read() throws IOException { + if (!buffer.hasRemaining()) { + return -1; + } + return buffer.get() & 0xFF; + } + + public int read(byte[] bytes, int off, int len) throws IOException { + if (!buffer.hasRemaining()) { + return -1; + } + + len = Math.min(len, buffer.remaining()); + buffer.get(bytes, off, len); + return len; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java new file mode 100644 index 0000000..8cf759c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + + +/** + * A byte buffer backed output outputStream + */ +public class ByteBufferOutputStream extends OutputStream { + + ByteBuffer buffer; + + public ByteBufferOutputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + public void write(int b) throws IOException { + buffer.put((byte) b); + } + + public void write(byte[] bytes, int off, int len) throws IOException { + buffer.put(bytes, off, len); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 906da02..3bf1639 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -16,6 +16,13 @@ */ package org.apache.kafka.common.record; +import java.io.*; +import java.nio.ByteBuffer; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import org.apache.kafka.common.KafkaException; + /** * The compression type to use */ @@ -30,6 +37,68 @@ public enum CompressionType { this.name = name; } + public DataOutputStream wrapForOutput(ByteBuffer buffer) { + try { + switch (id) { + case 0: + return new DataOutputStream( + new ByteBufferOutputStream(buffer) + ); + case 1: + return new DataOutputStream( + new GZIPOutputStream( + new ByteBufferOutputStream(buffer) + )); + case 2: + ClassLoader classLoader = ClassLoader.class.getClassLoader(); + + try { + Class SnappyOutputStream = classLoader.loadClass("org.xerial.snappy.SnappyOutputStream"); + OutputStream stream = (OutputStream) SnappyOutputStream.getConstructor(OutputStream.class) + .newInstance(new ByteBufferOutputStream(buffer)); + return new DataOutputStream(stream); + } catch (Exception e) { + throw new KafkaException(e); + } + default: + throw new IllegalArgumentException("Unknown compression type id: " + id); + } + } catch (IOException e) { + throw new KafkaException(e); + } + } + + public DataInputStream wrapForInput(ByteBuffer buffer) { + try { + switch (id) { + case 0: + return new DataInputStream( + new ByteBufferInputStream(buffer) + ); + case 1: + return new DataInputStream( + new GZIPInputStream( + new ByteBufferInputStream(buffer) + )); + case 2: + ClassLoader classLoader = ClassLoader.class.getClassLoader(); + + try { + Class SnappyInputStream = classLoader.loadClass("org.xerial.snappy.SnappyInputStream"); + InputStream stream = (InputStream) SnappyInputStream.getConstructor(InputStream.class) + .newInstance(new ByteBufferInputStream(buffer)); + return new DataInputStream(stream); + } catch (Exception e) { + throw new KafkaException(e); + } + default: + throw new IllegalArgumentException("Unknown compression type id: " + id); + } + } catch (IOException e) { + throw new KafkaException(e); + } + } + public static CompressionType forId(int id) { switch (id) { case 0: @@ -53,4 +122,5 @@ public enum CompressionType { else throw new IllegalArgumentException("Unknown compression name: " + name); } + } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 9d8935f..cdf2f73 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -16,12 +16,16 @@ */ package org.apache.kafka.common.record; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; import java.util.Iterator; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.Utils; /** @@ -30,41 +34,138 @@ import org.apache.kafka.common.utils.AbstractIterator; public class MemoryRecords implements Records { private final ByteBuffer buffer; + private final CompressionType type; + private final int initialPos; + private long numRecords; + + private DataOutputStream appendStream; public MemoryRecords(int size) { - this(ByteBuffer.allocate(size)); + this(ByteBuffer.allocate(size), CompressionType.NONE); } - public MemoryRecords(ByteBuffer buffer) { + public MemoryRecords(ByteBuffer buffer, CompressionType type) { this.buffer = buffer; + this.type = type; + + this.initialPos = buffer.position(); + this.numRecords = 0L; + } + + private void maybeInitForAppend() { + if (appendStream == null) { + if (type != CompressionType.NONE) { + // for compressed records, first write the place-holder header + buffer.position(initialPos + LOG_OVERHEAD); + Record.write(buffer, null, null, type); + // move the starting position for value payload + // TODO: assuming key==null + buffer.position(initialPos + LOG_OVERHEAD + Record.RECORD_OVERHEAD); + } + // create the stream + appendStream = type.wrapForOutput(buffer); + } } /** - * Append the given record and offset to the buffer + * Close the append stream */ - public void append(long offset, Record record) { - buffer.putLong(offset); - buffer.putInt(record.size()); - buffer.put(record.buffer()); - record.buffer().rewind(); + private void closeForAppend() { + try { + appendStream.close(); + } catch (IOException e) { + throw new KafkaException(e); + } } + + /** - * Append a new record and offset to the buffer + * Try to append the given record and offset to the buffer + * @return true if append succeeds, false otherwise */ - public void append(long offset, byte[] key, byte[] value, CompressionType type) { - buffer.putLong(offset); - buffer.putInt(Record.recordSize(key, value)); - Record.write(this.buffer, key, value, type); + public boolean append(long offset, Record record) { + maybeInitForAppend(); + int pos = buffer.position(); + // when the compressed message size is bigger than its uncompressed counterpart, + // the underlying byte buffer may throw a BufferOverflowException, + // we need to catch this and return false indicating append failure + try { + appendStream.writeLong(offset); + appendStream.writeInt(record.size()); + ByteBuffer recordBuffer = record.buffer(); + appendStream.write(recordBuffer.array(), recordBuffer.arrayOffset(), recordBuffer.limit()); + } catch (IOException e) { + // TODO: logging please + // rewind partial writes + this.buffer.position(pos); + return false; + } + numRecords++; + return true; + } + + /** + * Try to append a new record and offset to the buffer + * @return true if append succeeds, false otherwise + */ + public boolean append(long offset, byte[] key, byte[] value) { + maybeInitForAppend(); + int pos = buffer.position(); + // when the compressed message size is bigger than its uncompressed counterpart, + // the underlying byte buffer may throw a BufferOverflowException, + // we need to catch this and return false indicating append failure + try { + appendStream.writeLong(offset); + appendStream.writeInt(Record.recordSize(key, value)); + Record.write(appendStream, key, value, CompressionType.NONE); + } catch (IOException e) { + // TODO: logging please + // rewind partial writes + this.buffer.position(pos); + return false; + } + numRecords++; + return true; } /** * Check if we have room for a new record containing the given key/value pair + * + * Note that even if this function returns true, the following append may still fail since + * the space is calculated assuming no-compression used. */ public boolean hasRoomFor(byte[] key, byte[] value) { return this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value); } + /** + * Mark this record set as ready + */ + public void ready() { + maybeInitForAppend(); + closeForAppend(); + + // For compressed messages wrap them as a single shallow message + if (type != CompressionType.NONE) { + int pos = buffer.position(); + // compute the fill the value size + // TODO: assuming key==null + int valueSize = pos - initialPos - LOG_OVERHEAD - Record.RECORD_OVERHEAD; + buffer.putInt(initialPos + LOG_OVERHEAD + Record.KEY_OFFSET, valueSize); + // compute and fill the crc at the beginning of the message + long crc = Record.computeChecksum(buffer, + initialPos + LOG_OVERHEAD + Record.MAGIC_OFFSET, + pos - initialPos - LOG_OVERHEAD - Record.MAGIC_OFFSET); + Utils.writeUnsignedInt(buffer, initialPos + LOG_OVERHEAD + Record.CRC_OFFSET, crc); + // write the header, for the end offset write as number of records - 1 + buffer.position(initialPos); + buffer.putLong(numRecords - 1); + buffer.putInt(pos - initialPos - LOG_OVERHEAD); + buffer.position(pos); + } + } + /** Write the records in this set to the given channel */ public int writeTo(GatheringByteChannel channel) throws IOException { return channel.write(buffer); @@ -93,27 +194,86 @@ public class MemoryRecords implements Records { public static class RecordsIterator extends AbstractIterator { private final ByteBuffer buffer; + private CompressedRecordsIterator innerIter; + public RecordsIterator(ByteBuffer buffer) { ByteBuffer copy = buffer.duplicate(); copy.flip(); this.buffer = copy; } + /* TODO: allow shallow iteration */ @Override protected LogEntry makeNext() { - if (buffer.remaining() < Records.LOG_OVERHEAD) - return allDone(); - long offset = buffer.getLong(); - int size = buffer.getInt(); - if (size < 0) - throw new IllegalStateException("Record with size " + size); - if (buffer.remaining() < size) - return allDone(); - ByteBuffer rec = buffer.slice(); - rec.limit(size); - this.buffer.position(this.buffer.position() + size); - return new LogEntry(offset, new Record(rec)); + if (innerDone()) { + if (buffer.remaining() < Records.LOG_OVERHEAD) + return allDone(); + long offset = buffer.getLong(); + int size = buffer.getInt(); + if (size < 0) + throw new IllegalStateException("Record with size " + size); + if (buffer.remaining() < size) + return allDone(); + ByteBuffer rec = buffer.slice(); + rec.limit(size); + buffer.position(buffer.position() + size); + LogEntry entry = new LogEntry(offset, new Record(rec)); + entry.record().ensureValid(); + + CompressionType compression = entry.record().compressionType(); + if (compression != CompressionType.NONE) { + // init the inner iterator with the value payload of the message + ByteBuffer value = entry.record().value(); + innerIter = new CompressedRecordsIterator(value, compression); + // the new compressed records should at least has one message + return innerIter.next(); + } else { + return entry; + } + } else { + return innerIter.next(); + } } - } + private boolean innerDone() { + return (innerIter == null || !innerIter.hasNext()); + } + + public static class CompressedRecordsIterator extends AbstractIterator { + private DataInputStream stream; + + public CompressedRecordsIterator(ByteBuffer buffer, CompressionType compression) { + ByteBuffer copy = buffer.duplicate(); + stream = compression.wrapForInput(copy); + } + + @Override + protected LogEntry makeNext() { + try { + // try to read the offset and record size first + byte[] overheadBuffer = new byte[LOG_OVERHEAD]; + int ret = stream.read(overheadBuffer, 0, LOG_OVERHEAD); + if (ret == -1) + return allDone(); + else if (ret != LOG_OVERHEAD) + throw new IllegalStateException("Invalid compressed record"); + ByteBuffer buffer = ByteBuffer.wrap(overheadBuffer); + long offset = buffer.getLong(); + int size = buffer.getInt(); + if (size < 0) + throw new IllegalStateException("Record with size " + size); + // try read the record + byte[] recordBuffer = new byte[size]; + ret = stream.read(recordBuffer, 0, size); + if (ret != size) + throw new IllegalStateException("Invalid compressed record"); + buffer = ByteBuffer.wrap(recordBuffer); + buffer.limit(size); + return new LogEntry(offset, new Record(buffer)); + } catch (IOException e) { + throw new KafkaException(e); + } + } + } + } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index f1dc977..da2e99e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -16,8 +16,13 @@ */ package org.apache.kafka.common.record; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.Crc32; import org.apache.kafka.common.utils.Utils; @@ -40,13 +45,15 @@ public final class Record { public static final int KEY_OFFSET = KEY_SIZE_OFFSET + KEY_SIZE_LENGTH; public static final int VALUE_SIZE_LENGTH = 4; - /** The amount of overhead bytes in a record */ - public static final int RECORD_OVERHEAD = KEY_OFFSET + VALUE_SIZE_LENGTH; + /** + * The size for the record header + */ + public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH; /** - * The minimum valid size for the record header + * The amount of overhead bytes in a record */ - public static final int MIN_HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; + public static final int RECORD_OVERHEAD = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; /** * The current "magic" value @@ -80,9 +87,18 @@ public final class Record { * @param valueSize The size of the payload to use */ public Record(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { - this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, value == null ? 0 : valueSize >= 0 ? valueSize - : value.length - valueOffset))); - write(this.buffer, key, value, codec, valueOffset, valueSize); + this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, + value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset))); + + try { + DataOutputStream stream = new DataOutputStream( + new ByteBufferOutputStream(this.buffer)); + long crc = computeChecksum(key, value, codec, valueOffset, valueSize); + byte attributes = computeAttributes(codec); + write(stream, crc, attributes, key, value, valueOffset, valueSize); + } catch (IOException e) { + throw new KafkaException(e); + } this.buffer.rewind(); } @@ -102,40 +118,57 @@ public final class Record { this(null, value, CompressionType.NONE); } - public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { - // skip crc, we will fill that in at the end - int pos = buffer.position(); - buffer.position(pos + MAGIC_OFFSET); - buffer.put(CURRENT_MAGIC_VALUE); + public static byte computeAttributes(CompressionType codec) { byte attributes = 0; if (codec.id > 0) attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id)); - buffer.put(attributes); + return attributes; + } + + public static void write(DataOutputStream stream, long crc, byte attributes, byte[] key, byte[] value, int valueOffset, int valueSize) throws IOException { + // write crc + stream.writeInt((int) (crc & 0xffffffffL)); + // write magic value + stream.writeByte(CURRENT_MAGIC_VALUE); + // write attributes + stream.writeByte(attributes); // write the key if (key == null) { - buffer.putInt(-1); + stream.writeInt(-1); } else { - buffer.putInt(key.length); - buffer.put(key, 0, key.length); + stream.writeInt(key.length); + stream.write(key, 0, key.length); } // write the value if (value == null) { - buffer.putInt(-1); + stream.writeInt(-1); } else { int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); - buffer.putInt(size); - buffer.put(value, valueOffset, size); + stream.writeInt(size); + stream.write(value, valueOffset, size); } + } - // now compute the checksum and fill it in - long crc = computeChecksum(buffer, - buffer.arrayOffset() + pos + MAGIC_OFFSET, - buffer.position() - pos - MAGIC_OFFSET - buffer.arrayOffset()); - Utils.writeUnsignedInt(buffer, pos + CRC_OFFSET, crc); + public static Record read(DataInputStream stream) throws IOException { + return null; + } + + public static void write(DataOutputStream stream, byte[] key, byte[] value, CompressionType codec) throws IOException { + long crc = computeChecksum(key, value, codec, 0, -1); + byte attributes = computeAttributes(codec); + write(stream, crc, attributes, key, value, 0, -1); } public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec) { - write(buffer, key, value, codec, 0, -1); + try { + DataOutputStream stream = new DataOutputStream( + new ByteBufferOutputStream(buffer)); + long crc = computeChecksum(key, value, codec, 0, -1); + byte attributes = computeAttributes(codec); + write(stream, crc, attributes, key, value, 0, -1); + } catch (IOException e) { + throw new KafkaException(e); + } } public static int recordSize(byte[] key, byte[] value) { @@ -154,9 +187,41 @@ public final class Record { * Compute the checksum of the record from the record contents */ public static long computeChecksum(ByteBuffer buffer, int position, int size) { - return Utils.crc32(buffer.array(), buffer.arrayOffset() + position, size - buffer.arrayOffset()); + Crc32 crc = new Crc32(); + crc.update(buffer.array(), buffer.arrayOffset() + position, size); + return crc.getValue(); + } + + /** + * Compute the checksum of the record from the attributes, key and value payloads + */ + public static long computeChecksum(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { + // TODO: we can remove this duplicate logic when we change the message format to put crc at the end + Crc32 crc = new Crc32(); + crc.update(CURRENT_MAGIC_VALUE); + byte attributes = 0; + if (codec.id > 0) + attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id)); + crc.update(attributes); + // update for the key + if (key == null) { + Utils.updateCrc32(crc, -1); + } else { + Utils.updateCrc32(crc, key.length); + crc.update(key, 0, key.length); + } + // update for the value + if (value == null) { + Utils.updateCrc32(crc, -1); + } else { + int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); + Utils.updateCrc32(crc, size); + crc.update(value, valueOffset, size); + } + return crc.getValue(); } + /** * Compute the checksum of the record from the record contents */ diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 9c34e7d..9697d51 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -105,8 +105,8 @@ public class Utils { } /** - * Compute the CRC32 of the segment of the byte array given by the specificed size and offset - * + * Compute the CRC32 of record given the key and value payload + * * @param bytes The bytes to checksum * @param offset the offset at which to begin checksumming * @param size the number of bytes to checksum @@ -119,6 +119,16 @@ public class Utils { } /** + * Update the CRC32 given an integer + */ + public static void updateCrc32(Crc32 crc, int input) { + crc.update((byte) input >> 24); + crc.update((byte) input >> 16); + crc.update((byte) input >> 8); + crc.update((byte) input /* >> 0 */); + } + + /** * Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). */ diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index b0745b5..8e59110 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -26,26 +26,26 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -import org.apache.kafka.common.record.LogEntry; -import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.Record; import org.junit.Test; public class MemoryRecordsTest { @Test public void testIterator() { - MemoryRecords recs1 = new MemoryRecords(ByteBuffer.allocate(1024)); - MemoryRecords recs2 = new MemoryRecords(ByteBuffer.allocate(1024)); + MemoryRecords recs1 = new MemoryRecords(1024); + MemoryRecords recs2 = new MemoryRecords(1024); List list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()), new Record("b".getBytes(), "2".getBytes()), new Record("c".getBytes(), "3".getBytes())); for (int i = 0; i < list.size(); i++) { Record r = list.get(i); recs1.append(i, r); - recs2.append(i, toArray(r.key()), toArray(r.value()), r.compressionType()); + recs2.append(i, toArray(r.key()), toArray(r.value())); } + recs1.ready(); + recs2.ready(); + for (int iteration = 0; iteration < 2; iteration++) { for (MemoryRecords recs : Arrays.asList(recs1, recs2)) { Iterator iter = recs.iterator(); @@ -54,10 +54,40 @@ public class MemoryRecordsTest { LogEntry entry = iter.next(); assertEquals((long) i, entry.offset()); assertEquals(list.get(i), entry.record()); + entry.record().ensureValid(); } assertFalse(iter.hasNext()); } } } + @Test + public void testGZIP() { + MemoryRecords recs1 = new MemoryRecords(ByteBuffer.allocate(1024), CompressionType.GZIP); + MemoryRecords recs2 = new MemoryRecords(ByteBuffer.allocate(1024), CompressionType.GZIP); + List list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()), + new Record("b".getBytes(), "2".getBytes()), + new Record("c".getBytes(), "3".getBytes())); + for (int i = 0; i < list.size(); i++) { + Record r = list.get(i); + recs1.append(i, r); + recs2.append(i, toArray(r.key()), toArray(r.value())); + } + + recs1.ready(); + recs2.ready(); + + for (MemoryRecords recs : Arrays.asList(recs1, recs2)) { + //for (MemoryRecords recs : Arrays.asList(recs1)) { + Iterator iter = recs.iterator(); + for (int i = 0; i < list.size(); i++) { + assertTrue(iter.hasNext()); + LogEntry entry = iter.next(); + assertEquals((long) i, entry.offset()); + assertEquals(list.get(i), entry.record()); + entry.record().ensureValid(); + } + assertFalse(iter.hasNext()); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java index ae54d67..3998696 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java @@ -39,12 +39,16 @@ import org.junit.runners.Parameterized.Parameters; public class RecordTest { private ByteBuffer key; + private byte[] keyArray; private ByteBuffer value; + private byte[] valueArray; private CompressionType compression; private Record record; public RecordTest(byte[] key, byte[] value, CompressionType compression) { + this.keyArray = key; this.key = key == null ? null : ByteBuffer.wrap(key); + this.valueArray = value; this.value = value == null ? null : ByteBuffer.wrap(value); this.compression = compression; this.record = new Record(key, value, compression); @@ -66,6 +70,8 @@ public class RecordTest { @Test public void testChecksum() { assertEquals(record.checksum(), record.computeChecksum()); + assertEquals(record.checksum(), record.computeChecksum( + this.keyArray, this.valueArray, this.compression, 0, -1)); assertTrue(record.isValid()); for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) { Record copy = copyOf(record); diff --git a/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java new file mode 100644 index 0000000..46713d7 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.Crc32; +import org.junit.Test; + +import java.lang.System; + +public class CrcTest { + + @Test + public void testUpdate() { + final byte bytes[] = "Any String you want".getBytes(); + final int len = bytes.length; + + System.out.println(len + " " + len/2 + " " + (len/2+1)); + + Crc32 crc1 = new Crc32(); + Crc32 crc2 = new Crc32(); + Crc32 crc3 = new Crc32(); + + crc1.update(bytes, 0, len); + for(int i = 0; i < len; i++) + crc2.update(bytes[i]); + crc3.update(bytes, 0, len/2); + crc3.update(bytes, len/2, len-len/2); + + assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue()); + assertEquals("Crc values should be the same", crc1.getValue(), crc3.getValue()); + } +} diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 36cfc0f..76a17e8 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -88,7 +88,7 @@ public class TestUtils { /** * Generate an array of random bytes * - * @param numBytes The size of the array + * @param size The size of the array */ public static byte[] randomBytes(int size) { byte[] bytes = new byte[size]; diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 34baa8c..127545a 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -272,4 +272,51 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { } } } + + @Test + def testCompressedRecord() { + + val props = new Properties() + props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); + var producer = new KafkaProducer(props) + + try { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic, 1, 1, servers) + val partition = 0 + + // make sure leaders exist + val leader = leaders(partition) + assertTrue("Leader for topic \"topic\" partition 1 should exist", leader.isDefined) + + val responses = + for (i <- 0 until numRecords) + yield producer.send(new ProducerRecord(topic, null, "key".getBytes, ("value" + i).getBytes)) + val futures = responses.toList + + // make sure the returned messages are correct + for ((future, offset) <- futures zip (0 until numRecords)) { + assertEquals(offset.toLong, future.get.offset) + } + + // make sure the fetched message count match + val fetchResponse = if(leader.get == server1.config.brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) + } + val messageSet = fetchResponse.messageSet(topic, partition).iterator.toBuffer + assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet.size) + + for (i <- 0 to numRecords - 1) + assertEquals(i.toLong, messageSet(i).offset) + + } finally { + if (producer != null) { + producer.close() + producer = null + } + } + } } \ No newline at end of file