diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 8835957..69b70bd 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -79,7 +79,8 @@ import com.google.common.annotations.VisibleForTesting; * and actual tag bytes length. */ @InterfaceAudience.Private -public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, SettableTimestamp { +public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, SettableTimestamp, + OutputStreamableCell { private static final ArrayList EMPTY_ARRAY_LIST = new ArrayList(); private static final Log LOG = LogFactory.getLog(KeyValue.class); @@ -2501,46 +2502,32 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, * Named oswrite so does not clash with {@link #write(KeyValue, DataOutput)} * @param kv * @param out - * @return Length written on stream - * @throws IOException - * @see #create(DataInput) for the inverse function - * @see #write(KeyValue, DataOutput) - * @deprecated use {@link #oswrite(KeyValue, OutputStream, boolean)} instead - */ - @Deprecated - public static long oswrite(final KeyValue kv, final OutputStream out) - throws IOException { - int length = kv.getLength(); - // This does same as DataOuput#writeInt (big-endian, etc.) - out.write(Bytes.toBytes(length)); - out.write(kv.getBuffer(), kv.getOffset(), length); - return length + Bytes.SIZEOF_INT; - } - - /** - * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do - * not require a {@link DataOutput}, just take plain {@link OutputStream} - * Named oswrite so does not clash with {@link #write(KeyValue, DataOutput)} - * @param kv - * @param out * @param withTags * @return Length written on stream * @throws IOException * @see #create(DataInput) for the inverse function * @see #write(KeyValue, DataOutput) * @see KeyValueUtil#oswrite(Cell, OutputStream, boolean) + * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. + * Instead use {@link #stream(OutputStream, boolean)} */ + @Deprecated public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags) throws IOException { + return kv.stream(out, withTags); + } + + @Override + public int stream(OutputStream out, boolean withTags) throws IOException { // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls // check KeyValueUtil#oswrite also and do necessary changes. - int length = kv.getLength(); + int length = this.length; if (!withTags) { - length = kv.getKeyLength() + kv.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE; + length = this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE; } // This does same as DataOuput#writeInt (big-endian, etc.) StreamUtils.writeInt(out, length); - out.write(kv.getBuffer(), kv.getOffset(), length); + out.write(this.bytes, this.offset, length); return length + Bytes.SIZEOF_INT; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index 4ef14fc..4f6d55b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -58,7 +58,7 @@ public class KeyValueUtil { cell.getValueLength(), cell.getTagsLength(), true); } - private static int length(short rlen, byte flen, int qlen, int vlen, int tlen, boolean withTags) { + public static int length(short rlen, byte flen, int qlen, int vlen, int tlen, boolean withTags) { if (withTags) { return (int) (KeyValue.getKeyValueDataStructureSize(rlen, flen, qlen, vlen, tlen)); } @@ -664,8 +664,8 @@ public class KeyValueUtil { public static void oswrite(final Cell cell, final OutputStream out, final boolean withTags) throws IOException { - if (cell instanceof KeyValue) { - KeyValue.oswrite((KeyValue) cell, out, withTags); + if (cell instanceof OutputStreamableCell) { + ((OutputStreamableCell)cell).stream(out, withTags); } else { short rlen = cell.getRowLength(); byte flen = cell.getFamilyLength(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java index c4c8351..1cbcab6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java @@ -19,7 +19,12 @@ */ package org.apache.hadoop.hbase; +import java.io.IOException; +import java.io.OutputStream; + import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.util.StreamUtils; +import org.apache.hadoop.hbase.util.Bytes; /** * An extension of the KeyValue where the tags length is always 0 @@ -34,4 +39,14 @@ public class NoTagsKeyValue extends KeyValue { public int getTagsLength() { return 0; } + + @Override + public int stream(OutputStream out, boolean withTags) throws IOException { + // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls + // check KeyValueUtil#oswrite also and do necessary changes. + // This does same as DataOuput#writeInt (big-endian, etc.) + StreamUtils.writeInt(out, this.length); + out.write(this.bytes, this.offset, this.length); + return this.length + Bytes.SIZEOF_INT; + } } \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OutputStreamableCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OutputStreamableCell.java new file mode 100644 index 0000000..8e73561 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OutputStreamableCell.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public interface OutputStreamableCell { + + /** + * Stream this cell to an OutputStream. + * @param out Stream to which cell has to be written + * @param withTags Whether to write tags. + * @return how many bytes written to out. + * @throws IOException + */ + int stream(OutputStream out, boolean withTags) throws IOException; +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 55fd811..9db287b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io.encoding; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.Cell; @@ -28,6 +29,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.OutputStreamableCell; import org.apache.hadoop.hbase.SettableSequenceId; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; @@ -35,6 +37,7 @@ import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.util.LRUDictionary; +import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -345,7 +348,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { // there. So this has to be an instance of SettableSequenceId. SeekerState need not be // SettableSequenceId as we never return that to top layers. When we have to, we make // ClonedSeekerState from it. - protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId { + protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId, + OutputStreamableCell { private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY)); @@ -534,6 +538,35 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { public long heapSize() { return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; } + + @Override + public int stream(OutputStream out, boolean withTags) throws IOException { + int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, + tagsLength, withTags); + StreamUtils.writeInt(out, lenToWrite); + StreamUtils.writeInt(out, keyOnlyBuffer.length); + StreamUtils.writeInt(out, valueLength); + // Write key + out.write(keyOnlyBuffer); + // Write value + assert this.currentBuffer.hasArray(); + out.write(this.currentBuffer.array(), this.currentBuffer.arrayOffset() + this.valueOffset, + this.valueLength); + if (withTags) { + // 2 bytes tags length followed by tags bytes + // tags length is serialized with 2 bytes only(short way) even if the type is int. + // As this is non -ve numbers, we save the sign bit. See HBASE-11437 + out.write((byte) (0xff & (this.tagsLength >> 8))); + out.write((byte) (0xff & this.tagsLength)); + if (this.tagCompressionContext != null) { + out.write(cloneTagsBuffer); + } else { + out.write(this.currentBuffer.array(), this.currentBuffer.arrayOffset() + this.tagsOffset, + this.tagsLength); + } + } + return lenToWrite + Bytes.SIZEOF_INT; + } } protected abstract static class