diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 3b43bfd..d70099e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -21,24 +21,20 @@ import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -144,7 +140,7 @@ class IPCUtil { throws IOException { // If compressed, decompress it first before passing it on else we will leak compression // resources if the stream is not closed properly after we let it out. - InputStream is = null; + DataInputStream dis = null; if (compressor != null) { // GZIPCodec fails w/ NPE if no configuration. if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); @@ -160,15 +156,15 @@ class IPCUtil { IOUtils.copy(cis, bbos); bbos.close(); ByteBuffer bb = bbos.getByteBuffer(); - is = new ByteArrayInputStream(bb.array(), 0, bb.limit()); + dis = new DataInputStream(new ByteArrayInputStream(bb.array(), 0, bb.limit())); } finally { - if (is != null) is.close(); + if (dis != null) dis.close(); CodecPool.returnDecompressor(poolDecompressor); } } else { - is = new ByteArrayInputStream(cellBlock, offset, length); + dis = new DataInputStream(new ByteArrayInputStream(cellBlock, offset, length)); } - return codec.getDecoder(is); + return codec.getDecoder(dis); } /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java index 3b95c53..e46920c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java @@ -17,17 +17,17 @@ */ package org.apache.hadoop.hbase.codec; +import java.io.DataInputStream; import java.io.IOException; -import java.io.InputStream; import org.apache.hadoop.hbase.Cell; public abstract class BaseDecoder implements Codec.Decoder { - protected final InputStream in; + protected final DataInputStream in; private boolean hasNext = true; private Cell current = null; - public BaseDecoder(final InputStream in) { + public BaseDecoder(final DataInputStream in) { this.in = in; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java index 7fa8695..0443476 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.codec; +import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -69,7 +70,7 @@ public class CellCodec implements Codec { } static class CellDecoder extends BaseDecoder { - public CellDecoder(final InputStream in) { + public CellDecoder(final DataInputStream in) { super(in); } @@ -100,7 +101,7 @@ public class CellCodec implements Codec { } @Override - public Decoder getDecoder(InputStream is) { + public Decoder getDecoder(DataInputStream is) { return new CellDecoder(is); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java index a89cc2b..a069cb0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.codec; -import java.io.InputStream; +import java.io.DataInputStream; import java.io.OutputStream; import org.apache.hadoop.hbase.CellScanner; @@ -46,6 +46,6 @@ public interface Codec { */ public interface Decoder extends CellScanner {}; - Decoder getDecoder(InputStream is); + Decoder getDecoder(DataInputStream dis); Encoder getEncoder(OutputStream os); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java index 0cf2dae..c386b92 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.codec; +import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -58,7 +59,7 @@ public class KeyValueCodec implements Codec { } public static class KeyValueDecoder extends BaseDecoder { - public KeyValueDecoder(final InputStream in) { + public KeyValueDecoder(final DataInputStream in) { super(in); } @@ -71,7 +72,7 @@ public class KeyValueCodec implements Codec { * Implementation depends on {@link InputStream#available()} */ @Override - public Decoder getDecoder(final InputStream is) { + public Decoder getDecoder(final DataInputStream is) { return new KeyValueDecoder(is); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java index d7183d9..9d422cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.codec; +import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -66,7 +67,7 @@ public class MessageCodec implements Codec { } static class MessageDecoder extends BaseDecoder { - MessageDecoder(final InputStream in) { + MessageDecoder(final DataInputStream in) { super(in); } @@ -80,7 +81,7 @@ public class MessageCodec implements Codec { } @Override - public Decoder getDecoder(InputStream is) { + public Decoder getDecoder(DataInputStream is) { return new MessageDecoder(is); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 245abba..72ade47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -158,7 +159,7 @@ public class WALCellCodec implements Codec { static class CompressedKvDecoder extends BaseDecoder { private final CompressionContext compression; - public CompressedKvDecoder(InputStream in, CompressionContext compression) { + public CompressedKvDecoder(DataInputStream in, CompressionContext compression) { super(in); this.compression = compression; } @@ -236,7 +237,7 @@ public class WALCellCodec implements Codec { } @Override - public Decoder getDecoder(InputStream is) { + public Decoder getDecoder(DataInputStream is) { return (compression == null) ? new KeyValueCodec.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java index 70dd11a..3137ba9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; import java.io.IOException; import org.apache.commons.logging.Log; @@ -113,7 +114,8 @@ public class CodecPerformance { } for (int i = 0; i < cycles; i++) { ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - Codec.Decoder decoder = codec.getDecoder(bais); + DataInputStream dis = new DataInputStream(bais); + Codec.Decoder decoder = codec.getDecoder(dis); cellsDecoded = CodecPerformance.runDecoderTest(i, count, decoder); } verifyCells(cells, cellsDecoded);