diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index e4bc972..c19b499 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -64,6 +64,7 @@ public class KafkaProducer implements Producer { private final Sender sender; private final Metrics metrics; private final Thread ioThread; + private final CompressionType compressionType; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -93,6 +94,7 @@ public class KafkaProducer implements Producer { config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG); + this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG), this.totalMemorySize, config.getLong(ProducerConfig.LINGER_MS_CONFIG), @@ -214,7 +216,7 @@ public class KafkaProducer implements Producer { int partition = partitioner.partition(record, cluster); ensureValidSize(record.key(), record.value()); TopicPartition tp = new TopicPartition(record.topic(), partition); - FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback); + FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), compressionType, callback); this.sender.wakeup(); return future; } catch (Exception e) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index d8e35e7..69c729e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -136,6 +136,11 @@ public class ProducerConfig extends AbstractConfig { public static final String MAX_RETRIES_CONFIG = "request.retries"; /** + * The compression type for all data generated. The default is none (i.e. no compression) + */ + public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; + + /** * Should we register the Kafka metrics as JMX mbeans? */ public static final String ENABLE_JMX_CONFIG = "enable.jmx"; @@ -159,8 +164,9 @@ public class ProducerConfig extends AbstractConfig { .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah") .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah") .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah") - .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "") - .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), ""); + .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "") + .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", "blah blah") + .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, ""); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ce5cf27..a408b22 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -26,10 +26,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.Record; -import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.*; import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -91,12 +88,12 @@ public final class RecordAccumulator { } }); metrics.addMetric("buffer_available_bytes", - "The total amount of buffer memory that is available (not currently used for buffering records).", - new Measurable() { - public double measure(MetricConfig config, long now) { - return free.availableMemory(); - } - }); + "The total amount of buffer memory that is available (not currently used for buffering records).", + new Measurable() { + public double measure(MetricConfig config, long now) { + return free.availableMemory(); + } + }); } /** @@ -118,7 +115,7 @@ public final class RecordAccumulator { synchronized (dq) { RecordBatch batch = dq.peekLast(); if (batch != null) { - FutureRecordMetadata future = batch.tryAppend(key, value, compression, callback); + FutureRecordMetadata future = batch.tryAppend(key, value, callback); if (future != null) return future; } @@ -130,7 +127,7 @@ public final class RecordAccumulator { synchronized (dq) { RecordBatch first = dq.peekLast(); if (first != null) { - FutureRecordMetadata future = first.tryAppend(key, value, compression, callback); + FutureRecordMetadata future = first.tryAppend(key, value, callback); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen // often... @@ -138,8 +135,21 @@ public final class RecordAccumulator { return future; } } - RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds()); - FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, compression, callback)); + MemoryRecords records = compression == CompressionType.NONE ? new MemoryRecords(buffer): new CompressedMemoryRecords(buffer, compression); + RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); + FutureRecordMetadata future = batch.tryAppend(key, value, callback); + + // when compression is enabled, it is possible that the compressed message size is larger than + // its uncompressed counterpart, causing the single compressed message to be bigger than the allocated + // buffer; under this case, we will re-build the batch as uncompressed version and re-append + if (future == null) { + // TODO: logging please + buffer.clear(); + records = new MemoryRecords(buffer); + batch = new RecordBatch(tp, records, time.milliseconds()); + future = Utils.notNull(batch.tryAppend(key, value, callback)); + } + dq.addLast(batch); return future; } @@ -178,8 +188,10 @@ public final class RecordAccumulator { if (batch != null) { boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining(); boolean expired = now - batch.created >= lingerMs; - if (full | expired | exhausted | closed) + if (full | expired | exhausted | closed) { + batch.records.close(); ready.add(batch.topicPartition); + } } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index eb16f6d..4984d26 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.clients.producer.internals; +import java.nio.BufferOverflowException; import java.util.ArrayList; import java.util.List; @@ -48,11 +49,13 @@ public final class RecordBatch { * * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ - public FutureRecordMetadata tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) { + public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) { if (!this.records.hasRoomFor(key, value)) { return null; } else { - this.records.append(0L, key, value, compression); + if (!this.records.append(0L, key, value)) + return null; + FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) thunks.add(new Thunk(callback, this.recordCount)); @@ -65,7 +68,7 @@ public final class RecordBatch { * Complete the request * * @param offset The offset - * @param errorCode The error code or 0 if no error + * @param exception The exception returned or null if no exception */ public void done(long offset, RuntimeException exception) { this.produceFuture.done(topicPartition, offset, exception); diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java new file mode 100644 index 0000000..f7577dc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * A byte buffer backed input outputStream + */ +public class ByteBufferInputStream extends InputStream { + + ByteBuffer buffer; + + public ByteBufferInputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + public int read() throws IOException { + if (!buffer.hasRemaining()) { + return -1; + } + return buffer.get() & 0xFF; + } + + public int read(byte[] bytes, int off, int len) throws IOException { + if (!buffer.hasRemaining()) { + return -1; + } + + len = Math.min(len, buffer.remaining()); + buffer.get(bytes, off, len); + return len; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java new file mode 100644 index 0000000..8cf759c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + + +/** + * A byte buffer backed output outputStream + */ +public class ByteBufferOutputStream extends OutputStream { + + ByteBuffer buffer; + + public ByteBufferOutputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + public void write(int b) throws IOException { + buffer.put((byte) b); + } + + public void write(byte[] bytes, int off, int len) throws IOException { + buffer.put(bytes, off, len); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressedMemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/CompressedMemoryRecords.java new file mode 100644 index 0000000..102d7d1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressedMemoryRecords.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.Iterator; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.Utils; + + +/** + * A {@link Records} implementation with in-place compression backed by a ByteBuffer. + */ +public class CompressedMemoryRecords extends MemoryRecords { + + private DataOutputStream stream; + private int startingPos; + private CompressionType compression; + private long endOffset = -1L; + + + public CompressedMemoryRecords(ByteBuffer buffer, CompressionType compression) { + super(buffer); + this.compression = compression; + } + + private void maybeInit() { + if (stream == null) { + startingPos = this.buffer.position(); + // write the attribute first + this.buffer.position(startingPos + LOG_OVERHEAD + Record.MAGIC_OFFSET); + byte attributes = Record.computeAttributes(compression); + this.buffer.put(attributes); + // create the stream + this.stream = compression.wrapForOutput(this.buffer); + } + } + + @Override + public boolean append(long offset, Record record) { + maybeInit(); + // when the compressed message size is bigger than its uncompressed counterpart, + // the underlying byte buffer may throw a BufferOverflowException, + // we need to catch this and return false indicating append failure + try { + stream.writeLong(offset); + stream.writeInt(record.size()); + ByteBuffer buffer = record.buffer(); + stream.write(buffer.array(), buffer.arrayOffset(), buffer.limit()); + } catch (BufferOverflowException e) { + // TODO: logging please + // rewind partial writes + return false; + } catch (IOException e) { + throw new KafkaException(e); + } + endOffset++; + return true; + } + + @Override + public boolean append(long offset, byte[] key, byte[] value) { + maybeInit(); + int curPos = this.buffer.position(); + try { + stream.writeLong(offset); + stream.writeInt(Record.recordSize(key, value)); + Record.write(stream, key, value, compression); + } catch (BufferOverflowException e) { + // TODO: logging please + // rewind partial writes + this.buffer.position(curPos); + return false; + } catch (IOException e) { + throw new KafkaException(e); + } + endOffset++; + return true; + } + + /** + * Stop any further appends and wrap this record set as a single shallow message + */ + @Override + public void close() { + // compute and fill the crc at the beginning of the message + long crc = Record.computeChecksum(buffer, startingPos + Record.MAGIC_OFFSET, buffer.position() - startingPos - Record.MAGIC_OFFSET); + Utils.writeUnsignedInt(buffer, startingPos + LOG_OVERHEAD + Record.CRC_OFFSET, crc); + // put the end offset and record size as the header + buffer.position(startingPos); + buffer.putLong(endOffset); + buffer.putInt(buffer.limit() - startingPos - LOG_OVERHEAD); + super.close(); + } + + @Override + public Iterator iterator() { + return new CompressedRecordsIterator(this.buffer, this.compression); + } + + public static class CompressedRecordsIterator extends AbstractIterator { + private DataInputStream stream; + + public CompressedRecordsIterator(ByteBuffer buffer, CompressionType compression) { + ByteBuffer copy = buffer.duplicate(); + copy.flip(); + stream = compression.wrapForInput(buffer); + } + + @Override + protected LogEntry makeNext() { + try { + // try to read the offset and record size first + byte[] overheadBuffer = new byte[LOG_OVERHEAD]; + int ret = stream.read(overheadBuffer, 0, LOG_OVERHEAD); + if (ret == -1) + return allDone(); + else if (ret != LOG_OVERHEAD) + throw new IllegalStateException("Invalid compressed record"); + ByteBuffer buffer = ByteBuffer.wrap(overheadBuffer); + long offset = buffer.getLong(); + int size = buffer.getInt(); + if (size < 0) + throw new IllegalStateException("Record with size " + size); + // try read the record + byte[] recordBuffer = new byte[size]; + ret = stream.read(recordBuffer, 0, size); + if (ret != size) + throw new IllegalStateException("Invalid compressed record"); + buffer = ByteBuffer.wrap(recordBuffer); + buffer.limit(size); + return new LogEntry(offset, new Record(buffer)); + } catch (IOException e) { + throw new KafkaException(e); + } + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 906da02..cfa0b7f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -16,6 +16,14 @@ */ package org.apache.kafka.common.record; +import java.io.*; +import java.nio.ByteBuffer; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; +import java.lang.reflect.InvocationTargetException; + +import org.apache.kafka.common.KafkaException; + /** * The compression type to use */ @@ -30,6 +38,84 @@ public enum CompressionType { this.name = name; } + public DataOutputStream wrapForOutput(ByteBuffer buffer) { + try { + switch (id) { + case 0: + return new DataOutputStream( + new ByteBufferOutputStream(buffer) + ); + case 1: + return new DataOutputStream( + new GZIPOutputStream( + new ByteBufferOutputStream(buffer) + )); + case 2: + ClassLoader classLoader = ClassLoader.class.getClassLoader(); + + try { + Class SnappyOutputStream = classLoader.loadClass("org.xerial.snappy.SnappyOutputStream"); + OutputStream stream = (OutputStream) SnappyOutputStream.getConstructor(OutputStream.class) + .newInstance(new ByteBufferOutputStream(buffer)); + return new DataOutputStream(stream); + } catch (ClassNotFoundException e) { + throw new KafkaException(e); + } catch (NoSuchMethodException e) { + throw new KafkaException(e); + } catch (InvocationTargetException e) { + throw new KafkaException(e); + } catch (InstantiationException e) { + throw new KafkaException(e); + } catch (IllegalAccessException e) { + throw new KafkaException(e); + } + default: + throw new IllegalArgumentException("Unknown compression type id: " + id); + } + } catch (IOException e) { + throw new KafkaException(e); + } + } + + public DataInputStream wrapForInput(ByteBuffer buffer) { + try { + switch (id) { + case 0: + return new DataInputStream( + new ByteBufferInputStream(buffer) + ); + case 1: + return new DataInputStream( + new GZIPInputStream( + new ByteBufferInputStream(buffer) + )); + case 2: + ClassLoader classLoader = ClassLoader.class.getClassLoader(); + + try { + Class SnappyInputStream = classLoader.loadClass("org.xerial.snappy.SnappyInputStream"); + InputStream stream = (InputStream) SnappyInputStream.getConstructor(InputStream.class) + .newInstance(new ByteBufferInputStream(buffer)); + return new DataInputStream(stream); + } catch (ClassNotFoundException e) { + throw new KafkaException(e); + } catch (NoSuchMethodException e) { + throw new KafkaException(e); + } catch (InvocationTargetException e) { + throw new KafkaException(e); + } catch (InstantiationException e) { + throw new KafkaException(e); + } catch (IllegalAccessException e) { + throw new KafkaException(e); + } + default: + throw new IllegalArgumentException("Unknown compression type id: " + id); + } + } catch (IOException e) { + throw new KafkaException(e); + } + } + public static CompressionType forId(int id) { switch (id) { case 0: @@ -53,4 +139,5 @@ public enum CompressionType { else throw new IllegalArgumentException("Unknown compression name: " + name); } + } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 9d8935f..f06d904 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -29,7 +29,8 @@ import org.apache.kafka.common.utils.AbstractIterator; */ public class MemoryRecords implements Records { - private final ByteBuffer buffer; + protected final ByteBuffer buffer; + private boolean closed; public MemoryRecords(int size) { this(ByteBuffer.allocate(size)); @@ -37,32 +38,44 @@ public class MemoryRecords implements Records { public MemoryRecords(ByteBuffer buffer) { this.buffer = buffer; + closed = false; } /** - * Append the given record and offset to the buffer + * Try to append the given record and offset to the buffer + * @return true if append succeeds, false otherwise */ - public void append(long offset, Record record) { + public boolean append(long offset, Record record) { buffer.putLong(offset); buffer.putInt(record.size()); buffer.put(record.buffer()); record.buffer().rewind(); + return true; } /** - * Append a new record and offset to the buffer + * Try to append a new record and offset to the buffer + * @return true if append succeeds, false otherwise */ - public void append(long offset, byte[] key, byte[] value, CompressionType type) { + public boolean append(long offset, byte[] key, byte[] value) { buffer.putLong(offset); buffer.putInt(Record.recordSize(key, value)); - Record.write(this.buffer, key, value, type); + Record.write(this.buffer, key, value, CompressionType.NONE); + return true; } /** * Check if we have room for a new record containing the given key/value pair */ public boolean hasRoomFor(byte[] key, byte[] value) { - return this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value); + return !closed && this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value); + } + + /** + * Mark this record set as ready and stop any further appends + */ + public void close() { + closed = true; } /** Write the records in this set to the given channel */ @@ -93,27 +106,51 @@ public class MemoryRecords implements Records { public static class RecordsIterator extends AbstractIterator { private final ByteBuffer buffer; + private AbstractIterator innerIter; + public RecordsIterator(ByteBuffer buffer) { ByteBuffer copy = buffer.duplicate(); copy.flip(); this.buffer = copy; } + /* TODO: allow shallow iteration */ @Override protected LogEntry makeNext() { - if (buffer.remaining() < Records.LOG_OVERHEAD) - return allDone(); - long offset = buffer.getLong(); - int size = buffer.getInt(); - if (size < 0) - throw new IllegalStateException("Record with size " + size); - if (buffer.remaining() < size) - return allDone(); - ByteBuffer rec = buffer.slice(); - rec.limit(size); - this.buffer.position(this.buffer.position() + size); - return new LogEntry(offset, new Record(rec)); + if (innerDone()) { + if (buffer.remaining() < Records.LOG_OVERHEAD) + return allDone(); + long offset = buffer.getLong(); + int size = buffer.getInt(); + if (size < 0) + throw new IllegalStateException("Record with size " + size); + if (buffer.remaining() < size) + return allDone(); + ByteBuffer rec = buffer.slice(); + rec.limit(size); + this.buffer.position(this.buffer.position() + size); + LogEntry entry = new LogEntry(offset, new Record(rec)); + + CompressionType compression = entry.record().compressionType(); + if (compression != CompressionType.NONE) { + // skip the header + ByteBuffer buffer = entry.record().buffer(); + buffer.position(Record.HEADER_SIZE); + // init the inner iterator + CompressedMemoryRecords records = new CompressedMemoryRecords(buffer.slice(), compression); + innerIter = (AbstractIterator) records.iterator(); + // the new compressed records should at least has one message + return innerIter.next(); + } else { + return entry; + } + } else { + return innerIter.next(); + } } - } + private boolean innerDone() { + return (innerIter == null || !innerIter.hasNext()); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index f1dc977..5e60045 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -16,8 +16,13 @@ */ package org.apache.kafka.common.record; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.Crc32; import org.apache.kafka.common.utils.Utils; @@ -44,9 +49,9 @@ public final class Record { public static final int RECORD_OVERHEAD = KEY_OFFSET + VALUE_SIZE_LENGTH; /** - * The minimum valid size for the record header + * The size for the record header */ - public static final int MIN_HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; + public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH; /** * The current "magic" value @@ -80,9 +85,18 @@ public final class Record { * @param valueSize The size of the payload to use */ public Record(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { - this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, value == null ? 0 : valueSize >= 0 ? valueSize - : value.length - valueOffset))); - write(this.buffer, key, value, codec, valueOffset, valueSize); + this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, + value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset))); + + try { + DataOutputStream stream = new DataOutputStream( + new ByteBufferOutputStream(this.buffer)); + long crc = computeChecksum(key, value, codec, valueOffset, valueSize); + byte attributes = computeAttributes(codec); + write(stream, crc, attributes, key, value, valueOffset, valueSize); + } catch (IOException e) { + throw new KafkaException(e); + } this.buffer.rewind(); } @@ -102,40 +116,57 @@ public final class Record { this(null, value, CompressionType.NONE); } - public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { - // skip crc, we will fill that in at the end - int pos = buffer.position(); - buffer.position(pos + MAGIC_OFFSET); - buffer.put(CURRENT_MAGIC_VALUE); + public static byte computeAttributes(CompressionType codec) { byte attributes = 0; if (codec.id > 0) attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id)); - buffer.put(attributes); + return attributes; + } + + public static void write(DataOutputStream stream, long crc, byte attributes, byte[] key, byte[] value, int valueOffset, int valueSize) throws IOException { + // write crc + stream.writeInt((int) (crc & 0xffffffffL)); + // write magic value + stream.writeByte(CURRENT_MAGIC_VALUE); + // write attributes + stream.writeByte(attributes); // write the key if (key == null) { - buffer.putInt(-1); + stream.writeInt(-1); } else { - buffer.putInt(key.length); - buffer.put(key, 0, key.length); + stream.writeInt(key.length); + stream.write(key, 0, key.length); } // write the value if (value == null) { - buffer.putInt(-1); + stream.writeInt(-1); } else { int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); - buffer.putInt(size); - buffer.put(value, valueOffset, size); + stream.writeInt(size); + stream.write(value, valueOffset, size); } + } + + public static Record read(DataInputStream stream) throws IOException { + return null; + } - // now compute the checksum and fill it in - long crc = computeChecksum(buffer, - buffer.arrayOffset() + pos + MAGIC_OFFSET, - buffer.position() - pos - MAGIC_OFFSET - buffer.arrayOffset()); - Utils.writeUnsignedInt(buffer, pos + CRC_OFFSET, crc); + public static void write(DataOutputStream stream, byte[] key, byte[] value, CompressionType codec) throws IOException { + long crc = computeChecksum(key, value, codec, 0, -1); + byte attributes = computeAttributes(codec); + write(stream, crc, attributes, key, value, 0, -1); } public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec) { - write(buffer, key, value, codec, 0, -1); + try { + DataOutputStream stream = new DataOutputStream( + new ByteBufferOutputStream(buffer)); + long crc = computeChecksum(key, value, codec, 0, -1); + byte attributes = computeAttributes(codec); + write(stream, crc, attributes, key, value, 0, -1); + } catch (IOException e) { + throw new KafkaException(e); + } } public static int recordSize(byte[] key, byte[] value) { @@ -154,9 +185,41 @@ public final class Record { * Compute the checksum of the record from the record contents */ public static long computeChecksum(ByteBuffer buffer, int position, int size) { - return Utils.crc32(buffer.array(), buffer.arrayOffset() + position, size - buffer.arrayOffset()); + Crc32 crc = new Crc32(); + crc.update(buffer.array(), buffer.arrayOffset() + position, size); + return crc.getValue(); + } + + /** + * Compute the checksum of the record from the attributes, key and value payloads + */ + public static long computeChecksum(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { + // TODO: we can remove this duplicate logic when we change the message format to put crc at the end + Crc32 crc = new Crc32(); + crc.update(CURRENT_MAGIC_VALUE); + byte attributes = 0; + if (codec.id > 0) + attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id)); + crc.update(attributes); + // update for the key + if (key == null) { + Utils.updateCrc32(crc, -1); + } else { + Utils.updateCrc32(crc, key.length); + crc.update(key, 0, key.length); + } + // update for the value + if (value == null) { + Utils.updateCrc32(crc, -1); + } else { + int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); + Utils.updateCrc32(crc, size); + crc.update(value, valueOffset, size); + } + return crc.getValue(); } + /** * Compute the checksum of the record from the record contents */ diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 9c34e7d..9697d51 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -105,8 +105,8 @@ public class Utils { } /** - * Compute the CRC32 of the segment of the byte array given by the specificed size and offset - * + * Compute the CRC32 of record given the key and value payload + * * @param bytes The bytes to checksum * @param offset the offset at which to begin checksumming * @param size the number of bytes to checksum @@ -119,6 +119,16 @@ public class Utils { } /** + * Update the CRC32 given an integer + */ + public static void updateCrc32(Crc32 crc, int input) { + crc.update((byte) input >> 24); + crc.update((byte) input >> 16); + crc.update((byte) input >> 8); + crc.update((byte) input /* >> 0 */); + } + + /** * Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). */ diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index b0745b5..2262494 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -26,9 +26,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -import org.apache.kafka.common.record.LogEntry; -import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.Record; import org.junit.Test; public class MemoryRecordsTest { @@ -43,7 +40,7 @@ public class MemoryRecordsTest { for (int i = 0; i < list.size(); i++) { Record r = list.get(i); recs1.append(i, r); - recs2.append(i, toArray(r.key()), toArray(r.value()), r.compressionType()); + recs2.append(i, toArray(r.key()), toArray(r.value())); } for (int iteration = 0; iteration < 2; iteration++) { @@ -54,6 +51,7 @@ public class MemoryRecordsTest { LogEntry entry = iter.next(); assertEquals((long) i, entry.offset()); assertEquals(list.get(i), entry.record()); + entry.record().ensureValid(); } assertFalse(iter.hasNext()); } diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java index ae54d67..3998696 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java @@ -39,12 +39,16 @@ import org.junit.runners.Parameterized.Parameters; public class RecordTest { private ByteBuffer key; + private byte[] keyArray; private ByteBuffer value; + private byte[] valueArray; private CompressionType compression; private Record record; public RecordTest(byte[] key, byte[] value, CompressionType compression) { + this.keyArray = key; this.key = key == null ? null : ByteBuffer.wrap(key); + this.valueArray = value; this.value = value == null ? null : ByteBuffer.wrap(value); this.compression = compression; this.record = new Record(key, value, compression); @@ -66,6 +70,8 @@ public class RecordTest { @Test public void testChecksum() { assertEquals(record.checksum(), record.computeChecksum()); + assertEquals(record.checksum(), record.computeChecksum( + this.keyArray, this.valueArray, this.compression, 0, -1)); assertTrue(record.isValid()); for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) { Record copy = copyOf(record); diff --git a/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java new file mode 100644 index 0000000..46713d7 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.Crc32; +import org.junit.Test; + +import java.lang.System; + +public class CrcTest { + + @Test + public void testUpdate() { + final byte bytes[] = "Any String you want".getBytes(); + final int len = bytes.length; + + System.out.println(len + " " + len/2 + " " + (len/2+1)); + + Crc32 crc1 = new Crc32(); + Crc32 crc2 = new Crc32(); + Crc32 crc3 = new Crc32(); + + crc1.update(bytes, 0, len); + for(int i = 0; i < len; i++) + crc2.update(bytes[i]); + crc3.update(bytes, 0, len/2); + crc3.update(bytes, len/2, len-len/2); + + assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue()); + assertEquals("Crc values should be the same", crc1.getValue(), crc3.getValue()); + } +} diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 36cfc0f..76a17e8 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -88,7 +88,7 @@ public class TestUtils { /** * Generate an array of random bytes * - * @param numBytes The size of the array + * @param size The size of the array */ public static byte[] randomBytes(int size) { byte[] bytes = new byte[size];