diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java new file mode 100644 index 0000000..f7577dc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * A byte buffer backed input outputStream + */ +public class ByteBufferInputStream extends InputStream { + + ByteBuffer buffer; + + public ByteBufferInputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + public int read() throws IOException { + if (!buffer.hasRemaining()) { + return -1; + } + return buffer.get() & 0xFF; + } + + public int read(byte[] bytes, int off, int len) throws IOException { + if (!buffer.hasRemaining()) { + return -1; + } + + len = Math.min(len, buffer.remaining()); + buffer.get(bytes, off, len); + return len; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java new file mode 100644 index 0000000..8cf759c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + + +/** + * A byte buffer backed output outputStream + */ +public class ByteBufferOutputStream extends OutputStream { + + ByteBuffer buffer; + + public ByteBufferOutputStream(ByteBuffer buffer) { + this.buffer = buffer; + } + + public void write(int b) throws IOException { + buffer.put((byte) b); + } + + public void write(byte[] bytes, int off, int len) throws IOException { + buffer.put(bytes, off, len); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressedMemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/CompressedMemoryRecords.java new file mode 100644 index 0000000..38358f0 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressedMemoryRecords.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import org.apache.kafka.common.KafkaException; + + +/** + * A {@link Records} implementation with in-place compression backed by a ByteBuffer. + */ +public class CompressedMemoryRecords extends MemoryRecords { + + private DataOutputStream outputStream; + private DataInputStream inputStream; + + public CompressedMemoryRecords(ByteBuffer buffer) { + super(buffer); + try { + this.outputStream = new DataOutputStream( + new GZIPOutputStream( + new ByteBufferOutputStream(this.buffer()) + )); + } catch (IOException e) { + throw new KafkaException(e); + } + } + + /** + * Append the given record and offset to the buffer + */ + @Override + public void append(long offset, Record record) { + try { + outputStream.writeLong(offset); + outputStream.writeInt(record.size()); + outputStream.write(record.buffer().array()); + record.buffer().rewind(); + } catch (IOException e) { + throw new KafkaException(e); + } + } + + /** + * Append a new record and offset to the buffer + */ + @Override + public void append(long offset, byte[] key, byte[] value, CompressionType type) { + // TODO: this will double-create the space for the record + // we do it to avoid duplicate the Record.write logic here + Record record = new Record(key, value); + append(offset, record); + } + + @Override + public Iterator iterator() { + // first decompress then return the resulted buffer + ByteArrayOutputStream outputBuffer = new ByteArrayOutputStream(); + + try { + inputStream = new DataInputStream( + new GZIPInputStream( + new ByteBufferInputStream(this.buffer()) + )); + byte[] interBuffer = new byte[1024]; + while (true) { + int read = inputStream.read(interBuffer); + if (read < 0) + break; + outputBuffer.write(interBuffer, 0, read); + } + } catch (IOException e) { + throw new KafkaException(e); + } + + return new RecordsIterator(ByteBuffer.wrap(outputBuffer.toByteArray())); + } +}