diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/compressors/EliasGammaCompressor.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/compressors/EliasGammaCompressor.java new file mode 100644 index 0000000..8d6e4a6 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/compressors/EliasGammaCompressor.java @@ -0,0 +1,114 @@ +package org.apache.hadoop.hive.contrib.ubercompressor.compressors; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hive.contrib.ubercompressor.InputReader; +import org.apache.hadoop.hive.contrib.ubercompressor.OutputWriter; +import org.apache.hadoop.hive.contrib.ubercompressor.TypeSpecificCompressor; +import org.apache.hadoop.hive.contrib.ubercompressor.dsalg.Tuple; +import org.apache.hadoop.hive.contrib.ubercompressor.io.BitInputStream; +import org.apache.hadoop.hive.contrib.ubercompressor.io.BitOutputStream; +import org.apache.hadoop.hive.contrib.ubercompressor.io.impl.BitInputStreamImpl; +import org.apache.hadoop.hive.contrib.ubercompressor.io.impl.BitOutputStreamImpl; +import org.apache.hadoop.hive.contrib.ubercompressor.model.EventHelperBijectiveDRV; + +class EliasGammaCompressor implements TypeSpecificCompressor { + + public EliasGammaCompressor(EventHelperBijectiveDRV eventHelper) { + super(); + this.mEventHelper = eventHelper; + } + + private EventHelperBijectiveDRV mEventHelper; + + private static class MyOutputStream implements OutputWriter { + + private EventHelperBijectiveDRV mEventHelper; + private BitOutputStream mOut; + + MyOutputStream(BitOutputStream out, EventHelperBijectiveDRV eventHelper) { + mOut = out; + mEventHelper = eventHelper; + } + + @Override + public void write(T colValue) throws IOException { + long i = mEventHelper.getDrv(colValue); + // TODO input validation + long oneSidedPosValue; + if (i > 0) { + oneSidedPosValue = 2 * i; + } else { + oneSidedPosValue = 1 - 2 * i; + } + actualGammaEncode(oneSidedPosValue); + } + + private void actualGammaEncode(long i) throws IOException { + long tmp = i; + int bitlen = 0; + while (tmp > 0) { + tmp >>>= 1; + ++bitlen; + } + mOut.writeBits(0, bitlen - 1); + mOut.writeBits(i, bitlen); + } + + @Override + public long close(boolean closeInner) throws IOException { + return mOut.close(closeInner); + } + } + + private static class MyInputStream implements InputReader { + + private EventHelperBijectiveDRV mEventHelper; + private BitInputStream mIn; + + MyInputStream(BitInputStream in, EventHelperBijectiveDRV eventHelper) { + mIn = in; + this.mEventHelper = eventHelper; + } + + private long actualGammaDecode() throws IOException { + int bitlen = 0; + while (mIn.readBit() != 1) { + ++bitlen; + } + return mIn.readBits(bitlen) | (0x01 << bitlen); + } + + public Tuple read() throws IOException { + try { + long rc = actualGammaDecode(); + int i = (int) (rc/2); + if (2*i != rc) { + i = -i; + } + return new Tuple(false, mEventHelper.getEvent(i)); + } catch (EOFException eof) { + return new Tuple(true, null); + } + + } + + @Override + public void close(boolean closeInner) throws IOException { + mIn.close(closeInner); + } + } + + @Override + public OutputWriter createOutputWriter(OutputStream out) { + return new MyOutputStream(new BitOutputStreamImpl(out), mEventHelper); + } + + @Override + public InputReader createInputReader(InputStream in, long numBits) { + return new MyInputStream(new BitInputStreamImpl(in), mEventHelper); + } +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/compressors/IntegerEliasGamma.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/compressors/IntegerEliasGamma.java new file mode 100644 index 0000000..eaef73d --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/compressors/IntegerEliasGamma.java @@ -0,0 +1,11 @@ +package org.apache.hadoop.hive.contrib.ubercompressor.compressors; + +import org.apache.hadoop.hive.contrib.ubercompressor.model.impl.IntegerEventHelper; + +public class IntegerEliasGamma extends EliasGammaCompressor { + + public IntegerEliasGamma() { + super(new IntegerEventHelper()); + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/io/BitInputStream.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/io/BitInputStream.java new file mode 100644 index 0000000..cee0c58 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/io/BitInputStream.java @@ -0,0 +1,21 @@ +package org.apache.hadoop.hive.contrib.ubercompressor.io; + +import java.io.IOException; + +public interface BitInputStream { + + public byte readBit() throws IOException; + + public int readBits(int numBits) throws IOException; + + public void skipBit() throws IOException; + + public void skipBits(int numBits) throws IOException; + + public byte peekBit() throws IOException; + + public int peekBits(int numBits) throws IOException; + + public void close(boolean closeInner) throws IOException; + +} \ No newline at end of file diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/io/BitOutputStream.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/io/BitOutputStream.java new file mode 100644 index 0000000..7c6e13f --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/io/BitOutputStream.java @@ -0,0 +1,19 @@ +package org.apache.hadoop.hive.contrib.ubercompressor.io; + +import java.io.IOException; + +public interface BitOutputStream { + + public void writeBit(byte bit) throws IOException; + + public void writeBits(long data, int numBits) throws IOException; + + /** + * if true, closes the underlying stream as well + * + * @return the number of bits written, -1 if unknown + * @throws IOException + */ + public long close(boolean closeInner) throws IOException; + +} \ No newline at end of file diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/io/impl/BitInputStreamImpl.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/io/impl/BitInputStreamImpl.java new file mode 100644 index 0000000..6cf4986 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/io/impl/BitInputStreamImpl.java @@ -0,0 +1,244 @@ +package org.apache.hadoop.hive.contrib.ubercompressor.io.impl; + +import java.io.BufferedInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.hive.contrib.ubercompressor.io.BitInputStream; + + +/** + * @author krishnak + * + */ +public class BitInputStreamImpl implements BitInputStream { + + private static final int READ_LIMIT = 1024; + private BufferedInputStream mIn; + private long mTotalBits = -1; + + private byte currentPeekedByte; + private int bitPeekIndex = -1; + private long mTotalReadBits = 0; + + private byte currentReadByte; + private int bitReadIndex = -1; + private long mTotalPeekedBits = 0; + + public BitInputStreamImpl(InputStream in) { + mIn = new BufferedInputStream(in); + mIn.mark(READ_LIMIT); + } + + public BitInputStreamImpl(InputStream in, long totalBits) { + mIn = new BufferedInputStream(in); + mIn.mark(READ_LIMIT); + mTotalBits = totalBits; + } + + + /* (non-Javadoc) + * @see com.yahoo.datacomp.io.impl.BitOutputStream#readBit() + */ + @Override + public byte readBit() throws IOException { + if (mTotalBits != -1) { + if (mTotalBits <= mTotalReadBits) { + throw new EOFException(); + } + } + if (bitReadIndex < 0) { + readByte(); + } + byte rc = (byte) ((currentReadByte >>> bitReadIndex) & 0x01); + --bitReadIndex; + ++mTotalReadBits; + resetPeek(); + return rc; + } + + private void resetPeek() throws IOException { + currentPeekedByte = currentReadByte; + bitPeekIndex = bitReadIndex; + mIn.reset(); + mTotalPeekedBits = mTotalReadBits; + } + + private static final int BITMASK[] = { + 0x00000001, 0x00000003, + 0x00000007, 0x00000000F, + 0x0000001F, 0x0000003F, + 0x0000007F, 0x000000FF }; + + /* (non-Javadoc) + * @see com.yahoo.datacomp.io.impl.BitOutputStream#readBits(int) + */ + @Override + public int readBits(int numBits) throws IOException { + assert (numBits <= 32); + if (mTotalBits != -1) { + if (mTotalBits < mTotalReadBits + numBits) { + throw new EOFException(); + } + } + int rc = 0; + while (numBits > 0) { + if (bitReadIndex < 0) { + readByte(); + } + if (bitReadIndex + 1 >= numBits) { + int tmp = ((int)(currentReadByte >>> (bitReadIndex + 1 - numBits)) & BITMASK[numBits - 1]); + rc <<= numBits; + rc |= tmp; + bitReadIndex -= numBits; + mTotalReadBits += numBits; + resetPeek(); + return rc; + } + int numBitsToReadInCurrentByte = bitReadIndex + 1; + int tmp = ((int)currentReadByte) & BITMASK[numBitsToReadInCurrentByte - 1]; + rc <<= numBitsToReadInCurrentByte; + rc |= tmp; + numBits -= numBitsToReadInCurrentByte; + bitReadIndex -= numBitsToReadInCurrentByte; + mTotalReadBits += numBitsToReadInCurrentByte; + } + resetPeek(); + return rc; + } + + /* (non-Javadoc) + * @see com.yahoo.datacomp.io.impl.BitOutputStream#skipBit() + */ + @Override + public void skipBit() throws IOException { + if (mTotalBits != -1) { + if (mTotalBits <= mTotalReadBits) { + throw new EOFException(); + } + } + if (bitReadIndex < 0) { + readByte(); + } + --bitReadIndex; + ++mTotalReadBits; + resetPeek(); + } + + /* (non-Javadoc) + * @see com.yahoo.datacomp.io.impl.BitOutputStream#skipBits(int) + */ + @Override + public void skipBits(int numBits) throws IOException { + assert (numBits <= 32); + if (mTotalBits != -1) { + if (mTotalBits < mTotalReadBits + numBits) { + throw new EOFException(); + } + } + while (numBits > 0) { + if (bitReadIndex < 0) { + readByte(); + } + if (bitReadIndex + 1 >= numBits) { + bitReadIndex -= numBits; + mTotalReadBits += numBits; + resetPeek(); + return; + } + int numBitsToReadInCurrentByte = bitReadIndex + 1; + numBits -= numBitsToReadInCurrentByte; + bitReadIndex -= numBitsToReadInCurrentByte; + mTotalReadBits += numBitsToReadInCurrentByte; + } + resetPeek(); + return; + } + + /* (non-Javadoc) + * @see com.yahoo.datacomp.io.impl.BitOutputStream#peekBit() + */ + @Override + public byte peekBit() throws IOException { + if (mTotalBits != -1) { + if (mTotalBits <= mTotalPeekedBits) { + throw new EOFException(); + } + } + if (bitPeekIndex < 0) { + peekByte(); + } + byte rc = (byte) ((currentPeekedByte >>> bitPeekIndex) & 0x01); + --bitPeekIndex; + ++mTotalPeekedBits; + return rc; + } + + /* (non-Javadoc) + * @see com.yahoo.datacomp.io.impl.BitOutputStream#peekBits(int) + */ + @Override + public int peekBits(int numBits) throws IOException { + assert (numBits <= 32); + if (mTotalBits != -1) { + if (mTotalBits < mTotalPeekedBits + numBits) { + throw new EOFException(); + } + } + int rc = 0; + while (numBits > 0) { + if (bitPeekIndex < 0) { + peekByte(); + } + if (bitPeekIndex + 1 >= numBits) { + int tmp = ((int)(currentPeekedByte >>> (bitPeekIndex + 1 - numBits)) & BITMASK[numBits - 1]); + rc <<= numBits; + rc |= tmp; + bitPeekIndex -= numBits; + mTotalReadBits += numBits; + return rc; + } + int numBitsToReadInCurrentByte = bitPeekIndex + 1; + int tmp = ((int)currentPeekedByte) & BITMASK[numBitsToReadInCurrentByte - 1]; + rc <<= numBitsToReadInCurrentByte; + rc |= tmp; + numBits -= numBitsToReadInCurrentByte; + bitPeekIndex -= numBitsToReadInCurrentByte; + mTotalReadBits += numBitsToReadInCurrentByte; + } + return rc; + } + + /* (non-Javadoc) + * @see com.yahoo.datacomp.io.impl.BitOutputStream#close() + */ + @Override + public void close(boolean closeInner) throws IOException { + if (closeInner) { + mIn.close(); + } + } + + private void peekByte() throws IOException { + int rc = mIn.read(); + if (rc == -1) { + throw new EOFException(); + } + currentPeekedByte = (byte) rc; + bitPeekIndex = 7; + } + + private void readByte() throws IOException { + mIn.reset(); + int rc = mIn.read(); + if (rc == -1) { + throw new EOFException(); + } + currentReadByte = (byte) rc; + mIn.mark(READ_LIMIT); + bitReadIndex = 7; + } + + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/io/impl/BitOutputStreamImpl.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/io/impl/BitOutputStreamImpl.java new file mode 100644 index 0000000..c614540 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/io/impl/BitOutputStreamImpl.java @@ -0,0 +1,91 @@ +package org.apache.hadoop.hive.contrib.ubercompressor.io.impl; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.contrib.ubercompressor.io.BitOutputStream; + +public class BitOutputStreamImpl implements BitOutputStream { + + private static final Log LOG = LogFactory.getLog(BitOutputStreamImpl.class); + + private OutputStream mOut; + private byte currentByte; + private int bitIndex = 7; + private long totalBits = 0; + + public BitOutputStreamImpl(OutputStream out) { + mOut = out; + } + + /* (non-Javadoc) + * @see com.yahoo.datacomp.io.impl.BitOutputStream#writeBit(byte) + */ + @Override + public void writeBit(byte bit) throws IOException { + LOG.trace("BitOut: " + Integer.toBinaryString(bit)); + currentByte |= (bit & 0x01) << bitIndex; + --bitIndex; + if (bitIndex < 0) { + mOut.write(currentByte); + bitIndex = 7; + currentByte = 0; + } + ++totalBits; + } + + private static byte BITMASK[] = { 0x01, 0x03, 0x07, 0x0F, 0x1F, 0x3F, 0x7F, (byte) 0xFF }; + + /* (non-Javadoc) + * @see com.yahoo.datacomp.io.impl.BitOutputStream#writeBits(long, int) + */ + @Override + public void writeBits(long data, int numBits) throws IOException { + if (LOG.isTraceEnabled()) { + String s = "0000000000000000000000000000000000000000000000000000000000000000" + + Long.toBinaryString(data); + String s1 = s.substring(s.length() - numBits); + if (s1.length() > 0) LOG.trace("BitsOut: " + s1); + } + assert (numBits <= 64); + totalBits += numBits; + while (numBits > 0) { + if (bitIndex + 1 >= numBits) { + currentByte |= (data & BITMASK[numBits - 1]) << (bitIndex + 1 - numBits); + bitIndex -= numBits; + if (bitIndex < 0) { + mOut.write(currentByte); + bitIndex = 7; + currentByte = 0; + } + return; + } + int numBitsToWriteInCurrentByte = bitIndex + 1; + byte tmp = + (byte) ((data >>> (numBits - numBitsToWriteInCurrentByte)) & BITMASK[numBitsToWriteInCurrentByte - 1]); + currentByte |= tmp; + mOut.write(currentByte); + bitIndex = 7; + currentByte = 0; + numBits -= numBitsToWriteInCurrentByte; + } + } + + /* (non-Javadoc) + * @see com.yahoo.datacomp.io.impl.BitOutputStream#close() + */ + @Override + public long close(boolean closeInner) throws IOException { + if (bitIndex != 7) { + mOut.write(currentByte); + } + mOut.flush(); + if (closeInner) { + mOut.close(); + } + return totalBits; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/model/EventHelper.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/model/EventHelper.java new file mode 100644 index 0000000..0c85c96 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/model/EventHelper.java @@ -0,0 +1,5 @@ +package org.apache.hadoop.hive.contrib.ubercompressor.model; + +public interface EventHelper { + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/model/EventHelperBijectiveDRV.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/model/EventHelperBijectiveDRV.java new file mode 100644 index 0000000..9b5e8f0 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/model/EventHelperBijectiveDRV.java @@ -0,0 +1,5 @@ +package org.apache.hadoop.hive.contrib.ubercompressor.model; + +public interface EventHelperBijectiveDRV extends EventHelperDRV { + public T getEvent(int drv); +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/model/EventHelperDRV.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/model/EventHelperDRV.java new file mode 100644 index 0000000..89ab790 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/model/EventHelperDRV.java @@ -0,0 +1,5 @@ +package org.apache.hadoop.hive.contrib.ubercompressor.model; + +public interface EventHelperDRV extends EventHelper { + public int getDrv(T t); +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/model/impl/IntegerEventHelper.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/model/impl/IntegerEventHelper.java new file mode 100644 index 0000000..8ae5c2b --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/model/impl/IntegerEventHelper.java @@ -0,0 +1,22 @@ +package org.apache.hadoop.hive.contrib.ubercompressor.model.impl; + +import org.apache.hadoop.hive.contrib.ubercompressor.model.EventHelperBijectiveDRV; + +public class IntegerEventHelper + implements EventHelperBijectiveDRV { + + + @Override + public int getDrv(Integer t) { + if (t == null) { + throw new IllegalArgumentException("IntegerEventHelper does not support null values"); + } + return t.intValue(); + } + + @Override + public Integer getEvent(int drv) { + return new Integer(drv); + } + +}