diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/types/PBType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/PBType.java new file mode 100644 index 0000000..6757937 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/PBType.java @@ -0,0 +1,49 @@ +package org.apache.hadoop.hbase.types; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.Message; +import org.apache.hadoop.hbase.util.Order; +import org.apache.hadoop.hbase.util.PositionedByteRange; + +public abstract class PBType implements DataType { + @Override + public boolean isOrderPreserving() { + return false; + } + + @Override + public Order getOrder() { + return null; + } + + @Override + public boolean isNullable() { + return false; + } + + @Override + public boolean isSkippable() { + return true; + } + + @Override + public int encodedLength(T val) { + return val.getSerializedSize(); + } + + public static CodedInputStream inputStreamFromByteRange(PositionedByteRange src) { + return CodedInputStream.newInstance( + src.getBytes(), + src.getOffset() + src.getPosition(), + src.getLength()); + } + + public static CodedOutputStream outputStreamFromByteRange(PositionedByteRange dst) { + return CodedOutputStream.newInstance( + dst.getBytes(), + dst.getOffset() + dst.getPosition(), + dst.getLength() + ); + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/types/PBKeyValue.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/types/PBKeyValue.java new file mode 100644 index 0000000..faae4a5 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/types/PBKeyValue.java @@ -0,0 +1,57 @@ +package org.apache.hadoop.hbase.types; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.hbase.protobuf.generated.CellProtos; +import org.apache.hadoop.hbase.util.PositionedByteRange; + +import java.io.IOException; + +public class PBKeyValue extends PBType { + @Override + public Class encodedClass() { + return CellProtos.KeyValue.class; + } + + @Override + public int skip(PositionedByteRange src) { + CellProtos.KeyValue.Builder builder = CellProtos.KeyValue.newBuilder(); + CodedInputStream is = inputStreamFromByteRange(src); + try { + builder.mergeFrom(is); + int consumed = is.getTotalBytesRead(); + src.setPosition(src.getPosition() + consumed); + return consumed; + } catch (IOException e) { + throw new RuntimeException("Error while skipping type.", e); + } + } + + @Override + public CellProtos.KeyValue decode(PositionedByteRange src) { + CellProtos.KeyValue.Builder builder = CellProtos.KeyValue.newBuilder(); + CodedInputStream is = inputStreamFromByteRange(src); + try { + CellProtos.KeyValue ret = builder.mergeFrom(is).build(); + src.setPosition(src.getPosition() + is.getTotalBytesRead()); + return ret; + } catch (IOException e) { + throw new RuntimeException("Error while decoding type.", e); + } + } + + @Override + public int encode(PositionedByteRange dst, CellProtos.KeyValue val) { + CodedOutputStream os = outputStreamFromByteRange(dst); + try { + int before = os.spaceLeft(), after, consumed; + val.writeTo(os); + after = os.spaceLeft(); + consumed = before - after; + dst.setPosition(dst.getPosition() + consumed); + return consumed; + } catch (IOException e) { + throw new RuntimeException("Error while encoding type.", e); + } + } +}