diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java index 51801a8..7bc8001 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java @@ -34,7 +34,7 @@ public abstract class BaseDecoder implements Codec.Decoder { protected static final Log LOG = LogFactory.getLog(BaseDecoder.class); protected final InputStream in; private boolean hasNext = true; - private Cell current = null; + public Cell current = null; public BaseDecoder(final InputStream in) { this.in = in; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/FastDiffDeltaKeyValueCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/FastDiffDeltaKeyValueCodec.java new file mode 100644 index 0000000..e9a6e70 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/FastDiffDeltaKeyValueCodec.java @@ -0,0 +1,385 @@ +package org.apache.hadoop.hbase.codec; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.io.ByteBufferOutputStream; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; + +public class FastDiffDeltaKeyValueCodec implements Codec { + + static final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2); + static final int SHIFT_TIMESTAMP_LENGTH = 0; + static final int FLAG_SAME_KEY_LENGTH = 1 << 3; + static final int FLAG_SAME_VALUE_LENGTH = 1 << 4; + static final int FLAG_SAME_TYPE = 1 << 5; + static final int FLAG_SAME_VALUE = 1 << 6; + + public static class FastDiffDeltaKeyValueEncoder extends BaseEncoder { + DataOutputStream dos = null; + Cell prevCell; + + public FastDiffDeltaKeyValueEncoder(OutputStream out) { + super(out); + dos = new DataOutputStream(out); + } + + @Override + public void write(Cell cell) throws IOException { + byte flag = 0; + int kLength = KeyValueUtil.keyLength(cell); + int vLength = cell.getValueLength(); + + if (prevCell == null) { + // copy the key, there is no common prefix with none + dos.write(flag); + ByteBufferUtils.putCompressedInt(dos, kLength); + ByteBufferUtils.putCompressedInt(dos, vLength); + ByteBufferUtils.putCompressedInt(dos, 0); + // System.out.println("write kLength " + kLength + ", vLength " + + // vLength); + CellUtil.writeFlatKey(cell, dos); + // Write the value part + dos.write(cell.getValueArray(), cell.getValueOffset(), + cell.getValueLength()); + } else { + int preKeyLength = KeyValueUtil.keyLength(prevCell); + int preValLength = prevCell.getValueLength(); + // find a common prefix and skip it + int commonPrefix = CellUtil.findCommonPrefixInFlatKey(cell, prevCell, + true, false); + + if (kLength == preKeyLength) { + flag |= FLAG_SAME_KEY_LENGTH; + } + if (vLength == prevCell.getValueLength()) { + flag |= FLAG_SAME_VALUE_LENGTH; + } + if (cell.getTypeByte() == prevCell.getTypeByte()) { + flag |= FLAG_SAME_TYPE; + } + + byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp()); + int commonTimestampPrefix = findCommonTimestampPrefix(curTsBuf, + Bytes.toBytes(prevCell.getTimestamp())); + + flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH; + + // Check if current and previous values are the same. Compare value + // length first as an optimization. + if (vLength == preValLength + && Bytes.equals(cell.getValueArray(), cell.getValueOffset(), + vLength, prevCell.getValueArray(), prevCell.getValueOffset(), + preValLength)) { + flag |= FLAG_SAME_VALUE; + } + + dos.write(flag); + if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { + ByteBufferUtils.putCompressedInt(dos, kLength); + } + if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { + ByteBufferUtils.putCompressedInt(dos, vLength); + } + ByteBufferUtils.putCompressedInt(dos, commonPrefix); + short rLen = cell.getRowLength(); + if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) { + // Previous and current rows are different. Copy the differing part of + // the row, skip the column family, and copy the qualifier. + CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, dos); + dos.write(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength()); + } else { + // The common part includes the whole row. As the column family is the + // same across the whole file, it will automatically be included in + // the + // common prefix, so we need not special-case it here. + // What we write here is the non common part of the qualifier + int commonQualPrefix = commonPrefix + - (rLen + KeyValue.ROW_LENGTH_SIZE) + - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE); + dos.write(cell.getQualifierArray(), cell.getQualifierOffset() + + commonQualPrefix, cell.getQualifierLength() - commonQualPrefix); + } + // Write non common ts part + dos.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE + - commonTimestampPrefix); + + // Write the type if it is not the same as before. + if ((flag & FLAG_SAME_TYPE) == 0) { + dos.write(cell.getTypeByte()); + } + + // Write the value if it is not the same as before. + if ((flag & FLAG_SAME_VALUE) == 0) { + dos.write(cell.getValueArray(), cell.getValueOffset(), vLength); + } + } + prevCell = cell; + } + + private int findCommonTimestampPrefix(byte[] curTsBuf, byte[] prevTsBuf) { + int commonPrefix = 0; + while (commonPrefix < (KeyValue.TIMESTAMP_SIZE - 1) + && curTsBuf[commonPrefix] == prevTsBuf[commonPrefix]) { + commonPrefix++; + } + return commonPrefix; // has to be at most 7 bytes + } + } + + public static class FastDiffCompressionState { + int prevTimestampOffset; + int keyLength; + int valueLength; + short rowLength; + int prevOffset = FIRST_KEY; + byte familyLength; + int qualifierLength; + byte type; + + private final static int FIRST_KEY = -1; + + boolean isFirst() { + return prevOffset == FIRST_KEY; + } + + /** + * Copies the first key/value from the given stream, and initializes + * decompression state based on it. Assumes that we have already read key + * and value lengths. Does not set {@link #qualifierLength} (not used by + * decompression) or {@link #prevOffset} (set by the calle afterwards). + */ + public void decompressFirstKV(ByteBuffer out, DataInputStream in) + throws IOException { + int kvPos = out.position(); + out.putInt(keyLength); + out.putInt(valueLength); + prevTimestampOffset = out.position() + keyLength + - KeyValue.TIMESTAMP_TYPE_SIZE; + ByteBufferUtils.copyFromStreamToBuffer(out, in, keyLength + valueLength); + rowLength = out.getShort(kvPos + KeyValue.ROW_OFFSET); + familyLength = out.get(kvPos + KeyValue.ROW_OFFSET + + KeyValue.ROW_LENGTH_SIZE + rowLength); + type = out.get(prevTimestampOffset + KeyValue.TIMESTAMP_SIZE); + } + } + + public static class FastDiffDeltaKeyValueDecoder extends BaseDecoder { + + ByteBufferOutputStream baos = null; + DataInputStream source = null; + FastDiffCompressionState state = null; + boolean parser = false; + ByteBuffer decompressedData = null; + + public FastDiffDeltaKeyValueDecoder(InputStream in) { + super(in); + baos = new ByteBufferOutputStream(4096); + source = new DataInputStream(in); + state = new FastDiffCompressionState(); + } + + private void uncompressSingleKeyValue() throws IOException { + byte flag = source.readByte(); + int prevKeyLength = state.keyLength; + + if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { + state.keyLength = ByteBufferUtils.readCompressedInt(source); + } + if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { + state.valueLength = ByteBufferUtils.readCompressedInt(source); + } + int commonLength = ByteBufferUtils.readCompressedInt(source); + + int kvPos = baos.size(); + + if (state.isFirst()) { + // this is the first element + state.decompressFirstKV(baos.getRawByteBuffer(), source); + } else { + // copy the prefix + int common; + int prevOffset; + + if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { + baos.write(state.keyLength); + baos.write(state.valueLength); + prevOffset = state.prevOffset + KeyValue.ROW_OFFSET; + common = commonLength; + } else { + if ((flag & FLAG_SAME_KEY_LENGTH) != 0) { + prevOffset = state.prevOffset; + common = commonLength + KeyValue.ROW_OFFSET; + } else { + baos.write(state.keyLength); + prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE; + common = commonLength + KeyValue.KEY_LENGTH_SIZE; + } + } + + baos.checkSizeAndGrow(common); + ByteBufferUtils.copyFromBufferToBuffer(baos.getRawByteBuffer(), + baos.getRawByteBuffer(), prevOffset, common); + + // copy the rest of the key from the buffer + int keyRestLength; + if (commonLength < state.rowLength + KeyValue.ROW_LENGTH_SIZE) { + // omit the family part of the key, it is always the same + int rowWithSizeLength; + int rowRestLength; + + // check length of row + if (commonLength < KeyValue.ROW_LENGTH_SIZE) { + baos.checkSizeAndGrow(KeyValue.ROW_LENGTH_SIZE); + // not yet copied, do it now + ByteBufferUtils.copyFromStreamToBuffer(baos.getRawByteBuffer(), + source, KeyValue.ROW_LENGTH_SIZE - commonLength); + + rowWithSizeLength = baos.getRawByteBuffer().getShort( + baos.getRawByteBuffer().position() - KeyValue.ROW_LENGTH_SIZE) + + KeyValue.ROW_LENGTH_SIZE; + rowRestLength = rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE; + } else { + // already in kvBuffer, just read it + rowWithSizeLength = baos.getRawByteBuffer().getShort( + kvPos + KeyValue.ROW_OFFSET) + + KeyValue.ROW_LENGTH_SIZE; + rowRestLength = rowWithSizeLength - commonLength; + } + + baos.checkSizeAndGrow(rowRestLength); + // copy the rest of row + ByteBufferUtils.copyFromStreamToBuffer(baos.getRawByteBuffer(), + source, rowRestLength); + + baos.checkSizeAndGrow(state.familyLength + + KeyValue.FAMILY_LENGTH_SIZE); + // copy the column family + ByteBufferUtils.copyFromBufferToBuffer(baos.getRawByteBuffer(), + baos.getRawByteBuffer(), state.prevOffset + + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE + + state.rowLength, state.familyLength + + KeyValue.FAMILY_LENGTH_SIZE); + state.rowLength = (short) (rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE); + + keyRestLength = state.keyLength - rowWithSizeLength + - state.familyLength + - (KeyValue.FAMILY_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE); + } else { + // prevRowWithSizeLength is the same as on previous row + keyRestLength = state.keyLength - commonLength + - KeyValue.TIMESTAMP_TYPE_SIZE; + } + baos.checkSizeAndGrow(keyRestLength); + // copy the rest of the key, after column family == column qualifier + ByteBufferUtils.copyFromStreamToBuffer(baos.getRawByteBuffer(), source, + keyRestLength); + + // copy timestamp + int prefixTimestamp = (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH; + baos.checkSizeAndGrow(prefixTimestamp); + ByteBufferUtils + .copyFromBufferToBuffer(baos.getRawByteBuffer(), + baos.getRawByteBuffer(), state.prevTimestampOffset, + prefixTimestamp); + state.prevTimestampOffset = baos.getRawByteBuffer().position() + - prefixTimestamp; + baos.checkSizeAndGrow(KeyValue.TIMESTAMP_SIZE - prefixTimestamp); + ByteBufferUtils.copyFromStreamToBuffer(baos.getRawByteBuffer(), source, + KeyValue.TIMESTAMP_SIZE - prefixTimestamp); + + // copy the type and value + if ((flag & FLAG_SAME_TYPE) != 0) { + baos.checkSizeAndGrow(state.valueLength + 2); + baos.getRawByteBuffer().put(state.type); + if ((flag & FLAG_SAME_VALUE) != 0) { + ByteBufferUtils.copyFromBufferToBuffer(baos.getRawByteBuffer(), + baos.getRawByteBuffer(), state.prevOffset + + KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength); + } else { + ByteBufferUtils.copyFromStreamToBuffer(baos.getRawByteBuffer(), + source, state.valueLength); + } + } else { + baos.checkSizeAndGrow(state.valueLength + KeyValue.TYPE_SIZE); + if ((flag & FLAG_SAME_VALUE) != 0) { + ByteBufferUtils.copyFromStreamToBuffer(baos.getRawByteBuffer(), + source, + KeyValue.TYPE_SIZE); + ByteBufferUtils.copyFromBufferToBuffer(baos.getRawByteBuffer(), + baos.getRawByteBuffer(), state.prevOffset + + KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength); + } else { + ByteBufferUtils.copyFromStreamToBuffer(baos.getRawByteBuffer(), + source, + state.valueLength + KeyValue.TYPE_SIZE); + } + state.type = baos.getRawByteBuffer().get( + state.prevTimestampOffset + + KeyValue.TIMESTAMP_SIZE); + } + } + state.prevOffset = kvPos; + } + + public void internalDecodeKeyValues() throws IOException { + while (source.available() > 0) { + uncompressSingleKeyValue(); + } + decompressedData = baos.getByteBuffer(); + } + + @Override + public boolean advance() throws IOException { + if (!parser) { + return super.advance(); + } else { + if (decompressedData.hasRemaining()) { + this.current = parseCell(); + return true; + } else { + return false; + } + } + } + + @Override + protected Cell parseCell() throws IOException { + if (!parser) { + internalDecodeKeyValues(); + parser = true; + } + if (decompressedData.hasRemaining()) { + int offset = decompressedData.position(); + int klen = decompressedData.getInt(); + int vlen = decompressedData.getInt(); + ByteBufferUtils.skip(decompressedData, klen + vlen); + KeyValue kv = new KeyValue(decompressedData.array(), offset, + (int) KeyValue.getKeyValueDataStructureSize(klen, vlen, 0)); + return kv; + } + return null; + } + } + + @Override + public Decoder getDecoder(InputStream is) { + return new FastDiffDeltaKeyValueDecoder(is); + } + + @Override + public Encoder getEncoder(OutputStream os) { + return new FastDiffDeltaKeyValueEncoder(os); + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/PerfCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/PerfCodec.java new file mode 100644 index 0000000..7f0ce49 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/PerfCodec.java @@ -0,0 +1,236 @@ +package org.apache.hadoop.hbase.codec; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Random; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.compress.CompressionOutputStream; + +import com.google.common.io.CountingInputStream; +import com.google.common.io.CountingOutputStream; + +public class PerfCodec { + + public static void testKeyValueCodec() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + CountingOutputStream cos = new CountingOutputStream(baos); + DataOutputStream dos = new DataOutputStream(cos); + KeyValueCodec kvc = new KeyValueCodec(); + Codec.Encoder encoder = kvc.getEncoder(dos); + final KeyValue kv1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), + Bytes.toBytes("1"), Bytes.toBytes("1")); + final KeyValue kv2 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), + Bytes.toBytes("2"), Bytes.toBytes("2")); + final KeyValue kv3 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), + Bytes.toBytes("3"), Bytes.toBytes("3")); + encoder.write(kv1); + encoder.write(kv2); + encoder.write(kv3); + encoder.flush(); + dos.close(); + System.out.println("KeyValueCodec size " + baos.toByteArray().length); + CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream( + baos.toByteArray())); + DataInputStream dis = new DataInputStream(cis); + Codec.Decoder decoder = kvc.getDecoder(dis); + assertTrue(decoder.advance()); + KeyValue kv = (KeyValue) decoder.current(); + assertTrue(kv1.equals(kv)); + assertTrue(decoder.advance()); + kv = (KeyValue) decoder.current(); + assertTrue(kv2.equals(kv)); + assertTrue(decoder.advance()); + kv = (KeyValue) decoder.current(); + assertTrue(kv3.equals(kv)); + assertFalse(decoder.advance()); + dis.close(); + } + + public static void testFastDiffKeyValueCodec() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + CountingOutputStream cos = new CountingOutputStream(baos); + DataOutputStream dos = new DataOutputStream(cos); + FastDiffDeltaKeyValueCodec kvc = new FastDiffDeltaKeyValueCodec(); + Codec.Encoder encoder = kvc.getEncoder(dos); + final KeyValue kv1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), + Bytes.toBytes("1"), Bytes.toBytes("1")); + final KeyValue kv2 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), + Bytes.toBytes("2"), Bytes.toBytes("2")); + final KeyValue kv3 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), + Bytes.toBytes("3"), Bytes.toBytes("3")); + encoder.write(kv1); + encoder.write(kv2); + encoder.write(kv3); + encoder.flush(); + dos.close(); + System.out.println("FastDiffDeltaKeyValueCodec size " + + baos.toByteArray().length); + CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream( + baos.toByteArray())); + DataInputStream dis = new DataInputStream(cis); + Codec.Decoder decoder = kvc.getDecoder(dis); + assertTrue(decoder.advance()); + KeyValue kv = (KeyValue) decoder.current(); + assertTrue(kv1.equals(kv)); + assertTrue(decoder.advance()); + kv = (KeyValue) decoder.current(); + assertTrue(kv2.equals(kv)); + assertTrue(decoder.advance()); + kv = (KeyValue) decoder.current(); + assertTrue(kv3.equals(kv)); + assertFalse(decoder.advance()); + dis.close(); + } + + public static void testCodecPerf(Codec codec, int numRow, int qualifierNum, + int valueLength) throws IOException { + long encodeTime = 0; + long decodeTime = 0; + long size = 0; + for (int r = 0; r < numRow; r++) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + CountingOutputStream cos = new CountingOutputStream(baos); + DataOutputStream dos = new DataOutputStream(cos); + Codec.Encoder encoder = codec.getEncoder(dos); + List kvs = getKeyValueList(qualifierNum, valueLength); + + long startTime = System.nanoTime(); + for (int i = 0; i < kvs.size(); i++) { + encoder.write(kvs.get(i)); + } + encoder.flush(); + dos.close(); + long endTime = System.nanoTime(); + encodeTime += (endTime - startTime); + + CountingInputStream cis = new CountingInputStream( + new ByteArrayInputStream(baos.toByteArray())); + size += baos.toByteArray().length; + DataInputStream dis = new DataInputStream(cis); + + startTime = System.nanoTime(); + Codec.Decoder decoder = codec.getDecoder(dis); + while (decoder.advance()) { + decoder.current(); + } + dis.close(); + endTime = System.nanoTime(); + decodeTime += (endTime - startTime); + } + System.out.println(codec.getClass().getName() + " encodeTime " + encodeTime + / 1000000 + " decodeTime " + decodeTime / 1000000 + ", total size " + + size + ", avg size " + size / numRow); + System.out.println(new Date() + " avg encodeTime " + encodeTime / numRow + + " ns, avg decodeTime " + decodeTime / numRow + " ns"); + } + + static Random random = new Random(); + + public static List getKeyValueList(int qualifierNum, int valueLength) { + List kvs = new ArrayList(); + long row = random.nextLong(); + for (int i = 0; i < qualifierNum; i++) { + KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("f"), + Bytes.toBytes("q" + i), Bytes.toBytes(getRandomString(valueLength))); + kvs.add(kv); + } + return kvs; + } + + public static String getRandomString(int length) { // length表示生成字符串的长度 + String base = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < length; i++) { + int number = random.nextInt(base.length()); + sb.append(base.charAt(number)); + } + return sb.toString(); + } + + public static void testCodecWithCompressionPerf(Codec codec, int numRow, + int qualifierNum, int valueLength) throws IOException { + long encodeTime = 0; + long decodeTime = 0; + long size = 0; + Algorithm gzAl = Compression.getCompressionAlgorithmByName("gz"); + for (int r = 0; r < numRow; r++) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + CompressionOutputStream cos = gzAl.createPlainCompressionStream(baos, + gzAl.getCompressor()); + Codec.Encoder encoder = codec.getEncoder(cos); + List kvs = getKeyValueList(qualifierNum, valueLength); + + long startTime = System.nanoTime(); + for (int i = 0; i < kvs.size(); i++) { + encoder.write(kvs.get(i)); + } + encoder.flush(); + cos.close(); + long endTime = System.nanoTime(); + encodeTime += (endTime - startTime); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + size += baos.toByteArray().length; + + InputStream is = gzAl.createDecompressionStream(bais, + gzAl.getDecompressor(), 8192); + ByteArrayOutputStream baos2 = new ByteArrayOutputStream(); + startTime = System.nanoTime(); + IOUtils.copy(is, baos2); + endTime = System.nanoTime(); + decodeTime += (endTime - startTime); + ByteArrayInputStream dis = new ByteArrayInputStream(baos2.toByteArray()); + + startTime = System.nanoTime(); + Codec.Decoder decoder = codec.getDecoder(dis); + while (decoder.advance()) { + decoder.current(); + } + dis.close(); + endTime = System.nanoTime(); + decodeTime += (endTime - startTime); + // System.out.println(encodeTime + " " + decodeTime); + } + System.out.println("Compression " + codec.getClass().getName() + + " encodeTime " + encodeTime / 1000000 + " decodeTime " + decodeTime + / 1000000 + ", total size " + size + ", avg size " + size / numRow); + System.out.println(new Date() + " avg encodeTime " + encodeTime / numRow + + " ns, avg decodeTime " + decodeTime / numRow + " ns"); + } + + public static void main(String[] args) throws IOException { + testFastDiffKeyValueCodec(); + testKeyValueCodec(); + + int numRow = 1000000; + int qualifierNum = 10; + int valueLength = 100; + + FastDiffDeltaKeyValueCodec fdkvc = new FastDiffDeltaKeyValueCodec(); + testCodecPerf(fdkvc, numRow, qualifierNum, valueLength); + testCodecPerf(fdkvc, numRow, qualifierNum, valueLength); + KeyValueCodec kvc = new KeyValueCodec(); + testCodecPerf(kvc, numRow, qualifierNum, valueLength); + testCodecPerf(kvc, numRow, qualifierNum, valueLength); + + KeyValueCodec kvc2 = new KeyValueCodec(); + testCodecWithCompressionPerf(kvc2, numRow, qualifierNum, valueLength); + testCodecWithCompressionPerf(kvc2, numRow, qualifierNum, valueLength); + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java index 1b2ab5d..7f8cf28 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java @@ -72,6 +72,10 @@ public class ByteBufferOutputStream extends OutputStream { return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity); } + public ByteBuffer getRawByteBuffer() { + return buf; + } + /** * This flips the underlying BB so be sure to use it _last_! * @return ByteBuffer @@ -81,7 +85,7 @@ public class ByteBufferOutputStream extends OutputStream { return buf; } - private void checkSizeAndGrow(int extra) { + public void checkSizeAndGrow(int extra) { if ( (buf.position() + extra) > buf.limit()) { // size calculation is complex, because we could overflow negative, // and/or not allocate enough space. this fixes that.