diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java new file mode 100644 index 0000000..f7577dc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * A byte buffer backed input outputStream + */ +public class ByteBufferInputStream extends InputStream { + + ByteBuffer buffer; + + public ByteBufferInputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + public int read() throws IOException { + if (!buffer.hasRemaining()) { + return -1; + } + return buffer.get() & 0xFF; + } + + public int read(byte[] bytes, int off, int len) throws IOException { + if (!buffer.hasRemaining()) { + return -1; + } + + len = Math.min(len, buffer.remaining()); + buffer.get(bytes, off, len); + return len; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java new file mode 100644 index 0000000..8cf759c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + + +/** + * A byte buffer backed output outputStream + */ +public class ByteBufferOutputStream extends OutputStream { + + ByteBuffer buffer; + + public ByteBufferOutputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + public void write(int b) throws IOException { + buffer.put((byte) b); + } + + public void write(byte[] bytes, int off, int len) throws IOException { + buffer.put(bytes, off, len); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressedMemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/CompressedMemoryRecords.java new file mode 100644 index 0000000..6702536 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressedMemoryRecords.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; +import java.util.Iterator; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.Utils; + + +/** + * A {@link Records} implementation with in-place compression backed by a ByteBuffer. + */ +public class CompressedMemoryRecords extends MemoryRecords { + + private DataOutputStream stream; + private int pos; + private long endOffset = -1L; + + + public CompressedMemoryRecords(ByteBuffer buffer) { + super(buffer); + } + + /* TODO: go beyond GZIP */ + public void maybeInit() { + if (stream == null) { + ByteBuffer buffer = this.buffer(); + pos = buffer.position(); + // write the attribute first + buffer.position(pos + OFFSET_LENGTH + Record.MAGIC_OFFSET); + byte attributes = Record.computeAttributes(CompressionType.GZIP); + buffer.put(attributes); + try { + this.stream = new DataOutputStream( + new GZIPOutputStream( + new ByteBufferOutputStream(buffer) + )); + } catch (IOException e) { + throw new KafkaException(e); + } + } + } + + /** + * Append the given record and offset to the buffer + */ + @Override + public void append(long offset, Record record) { + maybeInit(); + try { + stream.writeLong(offset); + stream.writeInt(record.size()); + ByteBuffer buffer = record.buffer(); + stream.write(buffer.array(), buffer.arrayOffset(), buffer.limit()); + } catch (IOException e) { + throw new KafkaException(e); + } + endOffset++; + } + + /** + * Append a new record and offset to the buffer + */ + @Override + public void append(long offset, byte[] key, byte[] value, CompressionType type) { + maybeInit(); + try { + stream.writeLong(offset); + stream.writeInt(Record.recordSize(key, value)); + Record.write(stream, key, value, type); + } catch (IOException e) { + throw new KafkaException(e); + } + endOffset++; + } + + /** Write the records in this set as a compressed message to the given channel */ + public int writeTo(GatheringByteChannel channel) throws IOException { + // add the crc of the compressed message + ByteBuffer buffer = this.buffer(); + // then fill the crc at the beginning + long crc = Record.computeChecksum(buffer, pos + Record.MAGIC_OFFSET, buffer.position() - pos - Record.MAGIC_OFFSET); + Utils.writeUnsignedInt(buffer, pos + OFFSET_LENGTH + Record.CRC_OFFSET, crc); + // put the end offset + buffer.position(pos); + buffer.putLong(endOffset); + + return channel.write(buffer); + } + + /* TODO: in-place decompression */ + @Override + public Iterator iterator() { + // first decompress then return the resulted buffer + ByteArrayOutputStream outputBuffer = new ByteArrayOutputStream(); + + try { + DataInputStream inputStream = new DataInputStream( + new GZIPInputStream( + new ByteBufferInputStream(this.buffer()) + )); + byte[] interBuffer = new byte[1024]; + while (true) { + int read = inputStream.read(interBuffer); + if (read < 0) + break; + outputBuffer.write(interBuffer, 0, read); + } + } catch (IOException e) { + throw new KafkaException(e); + } + + return new RecordsIterator(ByteBuffer.wrap(outputBuffer.toByteArray())); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 9d8935f..f48a44c 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -29,6 +29,8 @@ import org.apache.kafka.common.utils.AbstractIterator; */ public class MemoryRecords implements Records { + public static final int OFFSET_LENGTH = 8; + private final ByteBuffer buffer; public MemoryRecords(int size) { diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index f1dc977..c69ed0b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -16,8 +16,12 @@ */ package org.apache.kafka.common.record; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.Crc32; import org.apache.kafka.common.utils.Utils; @@ -44,9 +48,9 @@ public final class Record { public static final int RECORD_OVERHEAD = KEY_OFFSET + VALUE_SIZE_LENGTH; /** - * The minimum valid size for the record header + * The size for the record header */ - public static final int MIN_HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH; + public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH; /** * The current "magic" value @@ -80,9 +84,18 @@ public final class Record { * @param valueSize The size of the payload to use */ public Record(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { - this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, value == null ? 0 : valueSize >= 0 ? valueSize - : value.length - valueOffset))); - write(this.buffer, key, value, codec, valueOffset, valueSize); + this(ByteBuffer.allocate(recordSize(key == null ? 0 : key.length, + value == null ? 0 : valueSize >= 0 ? valueSize : value.length - valueOffset))); + + try { + DataOutputStream stream = new DataOutputStream( + new ByteBufferOutputStream(this.buffer)); + long crc = computeChecksum(key, value, codec, valueOffset, valueSize); + byte attributes = computeAttributes(codec); + write(stream, crc, attributes, key, value, valueOffset, valueSize); + } catch (IOException e) { + throw new KafkaException(e); + } this.buffer.rewind(); } @@ -102,40 +115,53 @@ public final class Record { this(null, value, CompressionType.NONE); } - public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { - // skip crc, we will fill that in at the end - int pos = buffer.position(); - buffer.position(pos + MAGIC_OFFSET); - buffer.put(CURRENT_MAGIC_VALUE); + public static byte computeAttributes(CompressionType codec) { byte attributes = 0; if (codec.id > 0) attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id)); - buffer.put(attributes); + return attributes; + } + + public static void write(DataOutputStream stream, long crc, byte attributes, byte[] key, byte[] value, int valueOffset, int valueSize) throws IOException { + // write crc + stream.writeInt((int) (crc & 0xffffffffL)); + // write magic value + stream.writeByte(CURRENT_MAGIC_VALUE); + // write attributes + stream.writeByte(attributes); // write the key if (key == null) { - buffer.putInt(-1); + stream.writeInt(-1); } else { - buffer.putInt(key.length); - buffer.put(key, 0, key.length); + stream.writeInt(key.length); + stream.write(key, 0, key.length); } // write the value if (value == null) { - buffer.putInt(-1); + stream.writeInt(-1); } else { int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); - buffer.putInt(size); - buffer.put(value, valueOffset, size); + stream.writeInt(size); + stream.write(value, valueOffset, size); } + } - // now compute the checksum and fill it in - long crc = computeChecksum(buffer, - buffer.arrayOffset() + pos + MAGIC_OFFSET, - buffer.position() - pos - MAGIC_OFFSET - buffer.arrayOffset()); - Utils.writeUnsignedInt(buffer, pos + CRC_OFFSET, crc); + public static void write(DataOutputStream stream, byte[] key, byte[] value, CompressionType codec) throws IOException { + long crc = computeChecksum(key, value, codec, 0, -1); + byte attributes = computeAttributes(codec); + write(stream, crc, attributes, key, value, 0, -1); } public static void write(ByteBuffer buffer, byte[] key, byte[] value, CompressionType codec) { - write(buffer, key, value, codec, 0, -1); + try { + DataOutputStream stream = new DataOutputStream( + new ByteBufferOutputStream(buffer)); + long crc = computeChecksum(key, value, codec, 0, -1); + byte attributes = computeAttributes(codec); + write(stream, crc, attributes, key, value, 0, -1); + } catch (IOException e) { + throw new KafkaException(e); + } } public static int recordSize(byte[] key, byte[] value) { @@ -154,9 +180,41 @@ public final class Record { * Compute the checksum of the record from the record contents */ public static long computeChecksum(ByteBuffer buffer, int position, int size) { - return Utils.crc32(buffer.array(), buffer.arrayOffset() + position, size - buffer.arrayOffset()); + Crc32 crc = new Crc32(); + crc.update(buffer.array(), buffer.arrayOffset() + position, size); + return crc.getValue(); + } + + /** + * Compute the checksum of the record from the attributes, key and value payloads + */ + public static long computeChecksum(byte[] key, byte[] value, CompressionType codec, int valueOffset, int valueSize) { + // TODO: we can remove this duplicate logic when we change the message format to put crc at the end + Crc32 crc = new Crc32(); + crc.update(CURRENT_MAGIC_VALUE); + byte attributes = 0; + if (codec.id > 0) + attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & codec.id)); + crc.update(attributes); + // update for the key + if (key == null) { + Utils.updateCrc32(crc, -1); + } else { + Utils.updateCrc32(crc, key.length); + crc.update(key, 0, key.length); + } + // update for the value + if (value == null) { + Utils.updateCrc32(crc, -1); + } else { + int size = valueSize >= 0 ? valueSize : (value.length - valueOffset); + Utils.updateCrc32(crc, size); + crc.update(value, valueOffset, size); + } + return crc.getValue(); } + /** * Compute the checksum of the record from the record contents */ diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 9c34e7d..9697d51 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -105,8 +105,8 @@ public class Utils { } /** - * Compute the CRC32 of the segment of the byte array given by the specificed size and offset - * + * Compute the CRC32 of record given the key and value payload + * * @param bytes The bytes to checksum * @param offset the offset at which to begin checksumming * @param size the number of bytes to checksum @@ -119,6 +119,16 @@ public class Utils { } /** + * Update the CRC32 given an integer + */ + public static void updateCrc32(Crc32 crc, int input) { + crc.update((byte) input >> 24); + crc.update((byte) input >> 16); + crc.update((byte) input >> 8); + crc.update((byte) input /* >> 0 */); + } + + /** * Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). */ diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index b0745b5..8a0bec9 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -26,9 +26,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -import org.apache.kafka.common.record.LogEntry; -import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.Record; import org.junit.Test; public class MemoryRecordsTest { @@ -54,6 +51,7 @@ public class MemoryRecordsTest { LogEntry entry = iter.next(); assertEquals((long) i, entry.offset()); assertEquals(list.get(i), entry.record()); + entry.record().ensureValid(); } assertFalse(iter.hasNext()); } diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java index ae54d67..3998696 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java @@ -39,12 +39,16 @@ import org.junit.runners.Parameterized.Parameters; public class RecordTest { private ByteBuffer key; + private byte[] keyArray; private ByteBuffer value; + private byte[] valueArray; private CompressionType compression; private Record record; public RecordTest(byte[] key, byte[] value, CompressionType compression) { + this.keyArray = key; this.key = key == null ? null : ByteBuffer.wrap(key); + this.valueArray = value; this.value = value == null ? null : ByteBuffer.wrap(value); this.compression = compression; this.record = new Record(key, value, compression); @@ -66,6 +70,8 @@ public class RecordTest { @Test public void testChecksum() { assertEquals(record.checksum(), record.computeChecksum()); + assertEquals(record.checksum(), record.computeChecksum( + this.keyArray, this.valueArray, this.compression, 0, -1)); assertTrue(record.isValid()); for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) { Record copy = copyOf(record); diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 36cfc0f..76a17e8 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -88,7 +88,7 @@ public class TestUtils { /** * Generate an array of random bytes * - * @param numBytes The size of the array + * @param size The size of the array */ public static byte[] randomBytes(int size) { byte[] bytes = new byte[size];