diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/InputReader.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/InputReader.java new file mode 100644 index 0000000..ecebb3a --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/InputReader.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.IOException; + +import org.apache.hadoop.hive.contrib.ubercompressor.dsalg.Tuple; + +/** + * + * InputReader. a reader for instances of type T + * + * @param the type + */ +public interface InputReader { + /** + * + * read in an instance. + * + * @return a tuple indicating whether we have hit EOF, and if not, the instance read + * + * @throws IOException + */ + public Tuple read() throws IOException; + /** + * + * Close the reader + * + * @param closeUnderLying whether to close the underlying inputstream + * + * @throws IOException + */ + public void close(boolean closeUnderLying) throws IOException; +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/OutputWriter.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/OutputWriter.java new file mode 100644 index 0000000..4395426 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/OutputWriter.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.IOException; +/** + * + * OutputWriter. A writer for instances of type T + * + * @param the type + */ +public interface OutputWriter { + /** + * Pass an instance to be written to the outputstream. The instance may be cached internally and + * may be actually written at some future time, but will be written by the time close returns. + * Null values can be legally written into the stream. + * + * @param t the instance + * @throws IOException + */ + public void write(T t) throws IOException; + /** + * Close the writer. The writer will write and release any cached instances when closed. + * + * @param closeUnderlying propogate the close to the underlying stream + * @return the number of bits written, -1 if unknown + * @throws IOException + */ + public long close(boolean closeUnderlying) throws IOException; +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/TypeSpecificCompressor.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/TypeSpecificCompressor.java new file mode 100644 index 0000000..c4d908b --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/TypeSpecificCompressor.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * + * The interface for a type-specific compressor + * @param the type + */ +public interface TypeSpecificCompressor { + + /** + * Create a writer for a stream of this type instances + * + * @param out the outputstream to write into + * @return the writer instance + * @throws IOException + */ + public OutputWriter createOutputWriter(OutputStream out) throws IOException; + + /** + * + * Create a reader for a stream of this type instances + * + * @param in the inputstream to read from + * @param numBits the number of bits of data to consume from the inputstream, -1 if unknown + * @return the reader instance + * @throws IOException + */ + public InputReader createInputReader(InputStream in, long numBits) + throws IOException; +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionCodec.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionCodec.java new file mode 100644 index 0000000..91836c3 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionCodec.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; + +public class UberCompressionCodec implements CompressionCodec, Configurable { + + @Override + public Compressor createCompressor() { + return new DummyCompressor(); + } + + private static class DummyCompressor implements Compressor { + + @Override + public void setInput(byte[] b, int off, int len) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean needsInput() { + throw new UnsupportedOperationException(); + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException(); + } + + @Override + public long getBytesRead() { + throw new UnsupportedOperationException(); + } + + @Override + public long getBytesWritten() { + throw new UnsupportedOperationException(); + } + + @Override + public void finish() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean finished() { + throw new UnsupportedOperationException(); + } + + @Override + public int compress(byte[] b, int off, int len) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + } + + @Override + public void end() { + throw new UnsupportedOperationException(); + } + } + + @Override + public Decompressor createDecompressor() { + return new DummyDecompressor(); + } + + private static class DummyDecompressor implements Decompressor { + + @Override + public void setInput(byte[] b, int off, int len) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean needsInput() { + throw new UnsupportedOperationException(); + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean needsDictionary() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean finished() { + throw new UnsupportedOperationException(); + } + + @Override + public int decompress(byte[] b, int off, int len) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + } + + @Override + public void end() { + throw new UnsupportedOperationException(); + } + + } + + @Override + public Class getCompressorType() { + return DummyCompressor.class; + } + + @Override + public Class getDecompressorType() { + return DummyDecompressor.class; + } + + @Override + public CompressionInputStream createInputStream(InputStream arg0) throws IOException { + return new UberCompressionInputStream(conf, arg0); + } + + @Override + public CompressionInputStream createInputStream(InputStream arg0, Decompressor arg1) + throws IOException { + return new UberCompressionInputStream(conf, arg0); + } + + @Override + public CompressionOutputStream createOutputStream(OutputStream arg0) throws IOException { + return new UberCompressionOutputStream(conf, arg0); + } + + @Override + public CompressionOutputStream createOutputStream(OutputStream arg0, Compressor arg1) + throws IOException { + return new UberCompressionOutputStream(conf, arg0); + } + + @Override + public String getDefaultExtension() { + return ".uz"; + } + + private Configuration conf; + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration arg0) { + conf = arg0; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionInputStream.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionInputStream.java new file mode 100644 index 0000000..9908130 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionInputStream.java @@ -0,0 +1,245 @@ +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.SchemaAwareCompressionInputStream; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; + +public class UberCompressionInputStream extends SchemaAwareCompressionInputStream { + + private static final Log LOG = LogFactory.getLog(UberCompressionInputStream.class); + + private int mColumnNumber = -1; + + ByteStream.Output byteOut; + private byte[] buf; + + private Configuration conf; + + public UberCompressionInputStream(Configuration conf, InputStream arg0) throws IOException { + super(arg0); + this.conf = conf; + } + + @Override + public void setColumnIndex(int columnIndex) { + mColumnNumber = columnIndex; + } + + @Override + public void resetState() throws IOException { + } + + @Override + public int read(byte[] data, int offset, int length) throws IOException { + LOG.info("Reading " + length + " bytes for colindex " + mColumnNumber); + try { + if (mColumnNumber == -1) { + UberCompressorColumnConfig defaultConfig = new UberCompressorColumnConfig(conf); + CompressionCodec codec = defaultConfig.getCodec(); + CompressionInputStream cin = codec.createInputStream(in); + cin.resetState(); + readFully(data, offset, length, cin); + return length; + } + DataInput dataIn = new DataInputStream(in); + String tis = dataIn.readUTF(); + String colcon = dataIn.readUTF(); + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(tis); + UberCompressorColumnConfig columnConfig = new UberCompressorColumnConfig(mColumnNumber, + typeInfo, colcon, conf); + if (byteOut == null) { + byteOut = new ByteStream.Output(); + } else { + byteOut.reset(); + } + TypeSpecificCompressor compressor = columnConfig.getCompressor(); + if (compressor != null) { + ObjectInspector foi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo); + readFromCompressor(compressor, foi, typeInfo, length); + System.arraycopy(byteOut.getData(), 0, data, 0, length); + return length; + } else { + SerDe serde = columnConfig.getSerde(); + if (serde != null) { + int serializedLength = dataIn.readInt(); + enlargeBuffer(serializedLength); + CompressionCodec codec = columnConfig.getCodec(); + CompressionInputStream cin = codec.createInputStream(in); + readFully(buf, 0, serializedLength, cin); + DataInput din = new DataInputStream(new ByteArrayInputStream(buf, 0, serializedLength)); + Writable w = serde.getSerializedClass().newInstance(); + w.readFields(din); + Object o = serde.deserialize(w); + DataOutputStream byteStream = new DataOutputStream(byteOut); + StructObjectInspector soi = (StructObjectInspector) serde.getObjectInspector(); + Object list = soi.getStructFieldsDataAsList(o).get(0); + ListObjectInspector loi = (ListObjectInspector) soi.getAllStructFieldRefs().get(0) + .getFieldObjectInspector(); + ObjectInspector oi = loi.getListElementObjectInspector(); + for (Object element : loi.getList(list)) { + UberCompressorUtils.writeObject(element, oi, byteStream); + } + byteStream.close(); + System.arraycopy(byteOut.getData(), 0, data, 0, length); + return length; + } else { + CompressionCodec codec = columnConfig.getCodec(); + CompressionInputStream cin = codec.createInputStream(in); + cin.resetState(); + readFully(data, offset, length, cin); + return length; + } + } + } catch (SerDeException e) { + throw new IOException(e); + } catch (InstantiationException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + } + + private void readFully(byte[] data, int offset, int length, CompressionInputStream cin) + throws IOException, EOFException { + int read = 0; + while (read < length) { + int read_now = cin.read(data, offset + read, length - read); + if (read_now < 0) { + throw new EOFException("Trying to read " + length + + " bytes from the compressed stream, got only " + read + " bytes."); + } else { + read += read_now; + } + } + } + + private void readFromCompressor(TypeSpecificCompressor compressor, ObjectInspector foi, + TypeInfo typeInfo, int length) throws IOException { + DataOutputStream dataOut = new DataOutputStream(byteOut); + InputReader ir = compressor.createInputReader(in, -1); + if (typeInfo instanceof PrimitiveTypeInfo) { + PrimitiveTypeInfo pcolTypeInfo = (PrimitiveTypeInfo) typeInfo; + switch (pcolTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case INT: + case SHORT: { + @SuppressWarnings("unchecked") + InputReader iir = (InputReader) ir; + while (byteOut.getCount() < length) { + Integer i = iir.read().getSecond(); + UberCompressorUtils.writeObject(i, foi, dataOut); + dataOut.flush(); + } + break; + } + case STRING: { + @SuppressWarnings("unchecked") + InputReader iir = (InputReader) ir; + while (byteOut.getCount() < length) { + String i = iir.read().getSecond(); + UberCompressorUtils.writeObject(i, foi, dataOut); + dataOut.flush(); + } + break; + } + default: { + throw new RuntimeException("Unsupported type: " + pcolTypeInfo); + } + } + } else { + if (typeInfo instanceof MapTypeInfo) { + @SuppressWarnings("unchecked") + InputReader> mapReader = (InputReader>) ir; + while (byteOut.getCount() < length) { + Map i = mapReader.read().getSecond(); + UberCompressorUtils.writeObject(i, foi, dataOut); + dataOut.flush(); + } + } else { + if (typeInfo instanceof ListTypeInfo) { + @SuppressWarnings("unchecked") + InputReader>> listmapReader = + (InputReader>>) ir; + while (byteOut.getCount() < length) { + List> i = listmapReader.read().getSecond(); + UberCompressorUtils.writeObject(i, foi, dataOut); + dataOut.flush(); + } + } else { + throw new RuntimeException("Unsupported type: " + typeInfo); + } + } + } + } + + private void enlargeBuffer(int length) { + if ((buf == null) || buf.length < length) { + buf = new byte[length]; + } + } + + @Override + public int read() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public long skip(long n) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int available() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized void mark(int readlimit) { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized void reset() throws IOException { + } + + @Override + public boolean markSupported() { + return false; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionOutputStream.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionOutputStream.java new file mode 100644 index 0000000..1d5ac1a --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionOutputStream.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.SchemaAwareCompressionOutputStream; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionOutputStream; + +public class UberCompressionOutputStream extends SchemaAwareCompressionOutputStream { + + @SuppressWarnings("unused") + private static final Log LOG = LogFactory.getLog(UberCompressionOutputStream.class); + + private static final String COMPRESSION_CONF = "hive.ubercompressor.config.column."; + + private final ByteStream.Output byteOut; + private final Configuration conf; + + private int mColumnNumber = -1; + private boolean resetPending = true; + private UberCompressorColumnConfig columnConfig; + + private OutputWriter mOW; + private CompressionOutputStream mCos; + + public UberCompressionOutputStream(Configuration conf, OutputStream arg0) throws IOException { + super(arg0); + byteOut = new ByteStream.Output(); + this.conf = conf; + } + + @Override + public void finish() throws IOException { + if (mColumnNumber != -1) { + doWrite(); + } + if (mOW != null) { + mOW.close(false); + } else { + if (mCos != null) { + mCos.finish(); + } + } + } + + @Override + public void resetState() throws IOException { + resetPending = true; + } + + @Override + public void setColumnIndex(int columnIndex) { + if (mColumnNumber != columnIndex) { + mOW = null; + mCos = null; + resetPending = true; + mColumnNumber = columnIndex; + columnConfig = null; + } + } + + private void initializeColumnConfig() throws SerDeException { + UberCompressorConfig uberCompressorConfig = UberCompressorConfig.getInstance(); + if (mColumnNumber != -1) { + TypeInfo ti = uberCompressorConfig.getColumnType(mColumnNumber); + String colConfig = conf.get(COMPRESSION_CONF + mColumnNumber); + if (colConfig != null) { + columnConfig = new UberCompressorColumnConfig( + uberCompressorConfig.getColumnName(mColumnNumber), + ti, colConfig, conf); + } else { + columnConfig = new UberCompressorColumnConfig( + uberCompressorConfig.getColumnName(mColumnNumber), ti, conf); + } + } else { + columnConfig = new UberCompressorColumnConfig(conf); + } + } + + + private void doReset() throws IOException { + try { + if (columnConfig == null) { + initializeColumnConfig(); + } + } catch (SerDeException e) { + throw new IOException(e); + } + if (mColumnNumber == -1) { + if (mCos != null) { + mCos.resetState(); + } else { + CompressionCodec codec = columnConfig.getCodec(); + mCos = codec.createOutputStream(out); + } + } else { + byteOut.reset(); + } + resetPending = false; + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + if (resetPending) { + doReset(); + } + if (mColumnNumber == -1) { + mCos.write(data, offset, length); + return; + } else { + byteOut.write(data, offset, length); + } + } + + private void doWrite() throws IOException { + try { + byte[] data = byteOut.getData(); + int length = byteOut.getCount(); + if (length == 0) { + return; // nothing to do + } else { + TypeInfo typeInfo = columnConfig.getColumnType(); + String tis = typeInfo.getTypeName(); + DataOutputStream dataOutput = new DataOutputStream(out); + dataOutput.writeUTF(tis); + String colcon = columnConfig.getColumnConfig(); + dataOutput.writeUTF(colcon); + dataOutput.flush(); + TypeSpecificCompressor compressor = columnConfig.getCompressor(); + ObjectInspector foi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo); + DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data, 0, length)); + if (compressor != null) { + mOW = compressor.createOutputWriter(out); + writeToCompressor(dataIn, foi, typeInfo); + } else { + SerDe serde = columnConfig.getSerde(); + if (serde != null) { + Object f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + ArrayList list = new ArrayList(); + while (f != null) { + list.add(f); + f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + } + StandardListObjectInspector loi = + ObjectInspectorFactory.getStandardListObjectInspector(foi); + Object o = loi.create(list.size()); + for (int index = 0; index < list.size(); index++) { + loi.set(o, index, list.get(index)); + } + StandardStructObjectInspector soi = columnConfig.getTypeInfoForSerde(); + Object s = soi.create(); + soi.setStructFieldData(s, soi.getAllStructFieldRefs().get(0), o); + Writable w = serde.serialize(s, soi); + byteOut.reset(); + DataOutput serialized = new DataOutputStream(byteOut); + w.write(serialized); + dataOutput.writeInt(byteOut.getCount()); + dataOutput.flush(); + CompressionCodec codec = columnConfig.getCodec(); + mCos = codec.createOutputStream(out); + mCos.write(byteOut.getData(), 0, byteOut.getCount()); + } else { + if (mCos != null) { + mCos.resetState(); + } else { + CompressionCodec codec = columnConfig.getCodec(); + mCos = codec.createOutputStream(out); + } + mCos.write(data, 0, length); + } + byteOut.reset(); + } + } + } catch (SerDeException e) { + throw new IOException(e); + } + } + + @SuppressWarnings("unchecked") + private void writeToCompressor(DataInput dataIn, ObjectInspector foi, TypeInfo colTypeInfo) + throws IOException { + if (colTypeInfo instanceof PrimitiveTypeInfo) { + PrimitiveTypeInfo pcolTypeInfo = (PrimitiveTypeInfo) colTypeInfo; + switch (pcolTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case INT: + case SHORT: { + OutputWriter writer = (OutputWriter) mOW; + switch (pcolTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: { + Object f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + while (f != null) { + boolean v = (Boolean) f; + writer.write(v ? 1 : 0); + f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + } + return; + } + case BYTE: { + Object f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + while (f != null) { + byte v = (Byte) f; + writer.write((int) v); + f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + } + return; + } + case SHORT: { + Object f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + while (f != null) { + short v = (Short) f; + writer.write((int) v); + f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + } + return; + } + case INT: { + Object f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + while (f != null) { + int v = (Integer) f; + writer.write(v); + f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + } + return; + } + default: { + throw new RuntimeException("Unrecognized type: " + pcolTypeInfo.getPrimitiveCategory()); + } + } + } + case STRING: { + OutputWriter writer = (OutputWriter) mOW; + Object f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + while (f != null) { + String v = (String) f; + writer.write(v); + f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + } + return; + } + default: + throw new RuntimeException("Unrecognized type: " + pcolTypeInfo); + } + } else { + if (colTypeInfo instanceof MapTypeInfo) { + // currently support map of string/string only + OutputWriter> writer = (OutputWriter>) mOW; + Object f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + while (f != null) { + writer.write((Map) f); + f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + } + } else { + if (colTypeInfo instanceof ListTypeInfo) { + // currently support list of map of string/string only + OutputWriter>> writer = + (OutputWriter>>) mOW; + Object f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + while (f != null) { + writer.write((List>) f); + f = UberCompressorUtils.readObject(mColumnNumber, foi, dataIn); + } + } else { + throw new RuntimeException("Unsupported type for compressor:" + colTypeInfo); + } + } + } + } + + @Override + public void write(int b) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorColumnConfig.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorColumnConfig.java new file mode 100644 index 0000000..4289283 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorColumnConfig.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.util.ReflectionUtils; + +public class UberCompressorColumnConfig { + + @SuppressWarnings("unused") + private static final Log LOG = LogFactory.getLog(UberCompressorColumnConfig.class); + + private String name; + private String columnConfig; + private TypeInfo typeInfo; + + private TypeSpecificCompressor compressor; + private CompressionCodec codec; + private SerDe serde; + private StandardStructObjectInspector structOfListOfItems; + + public UberCompressorColumnConfig(Configuration co) throws SerDeException { + this(-1, null, co); + } + + public UberCompressorColumnConfig(int columnNo, TypeInfo ti, Configuration co) throws SerDeException { + this("col#" + columnNo, ti, "", co); + } + + public UberCompressorColumnConfig(String name, TypeInfo ti, Configuration co) + throws SerDeException { + this(name, ti, "", co); + } + + UberCompressorColumnConfig(int columnNo, TypeInfo typeInfo, String columnConfig, Configuration co) + throws SerDeException { + this("col#" + columnNo, typeInfo, columnConfig, co); + } + + public UberCompressorColumnConfig(String name, TypeInfo ti, String columnConfig, Configuration co) + throws SerDeException { + this.name = name; + this.columnConfig = columnConfig; + this.typeInfo = ti; + initializeColumnConfig(co); + LOG.info("Column:" + this.name + " initialized as per config:" + this.columnConfig); + } + + private void initializeColumnConfig(Configuration co) + throws SerDeException { + try { + if (columnConfig.startsWith("compressor:")) { + String compressorName = columnConfig.substring(11); + compressor = (TypeSpecificCompressor) Class.forName(compressorName).newInstance(); + } else { + if (columnConfig.startsWith("codec:")) { + String codecName = columnConfig.substring(6); + int ci = codecName.indexOf(','); + if (ci != -1) { + String serdeName = codecName.substring(ci + 1); + serde = (SerDe) Class.forName(serdeName).newInstance(); + Properties tbl = new Properties(); + TypeInfo list = TypeInfoFactory.getListTypeInfo(typeInfo); + List names = Arrays.asList("listfield"); + List typeInfos = Arrays.asList(list); + TypeInfo struct = TypeInfoFactory.getStructTypeInfo(names, typeInfos); + structOfListOfItems = (StandardStructObjectInspector) + TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(struct); + tbl.setProperty(Constants.LIST_COLUMNS, "listOf" + name); + tbl.setProperty(Constants.LIST_COLUMN_TYPES, list.getTypeName()); + tbl.put(Constants.ESCAPE_CHAR, "\n"); // For lazysimpleserde + serde.initialize(null, tbl); + codecName = codecName.substring(0, ci); + } + codec = (CompressionCodec) Class.forName(codecName).newInstance(); + ReflectionUtils.setConf(codec, co); + } else { + if (columnConfig.isEmpty()) { + columnConfig = "codec:org.apache.hadoop.io.compress.BZip2Codec"; + codec = new BZip2Codec(); + ReflectionUtils.setConf(codec, co); + } else { + throw new SerDeException("Unsupported type for compressor: " + columnConfig); + } + } + } + } catch (ClassNotFoundException e) { + throw new SerDeException(e); + } catch (IllegalAccessException e) { + throw new SerDeException(e); + } catch (InstantiationException e) { + throw new SerDeException(e); + } + } + + public TypeInfo getColumnType() { + return typeInfo; + } + + public String getColumnConfig() { + return columnConfig; + } + + @Override + public String toString() { + return "UberCompressorColumnConfig [name=" + name + + ", columnConfig=" + columnConfig + + ", typeInfo=" + typeInfo + + "]"; + } + + public TypeSpecificCompressor getCompressor() { + return compressor; + } + + public CompressionCodec getCodec() { + return codec; + } + + public SerDe getSerde() { + return serde; + } + + public StandardStructObjectInspector getTypeInfoForSerde() { + return structOfListOfItems; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorConfig.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorConfig.java new file mode 100644 index 0000000..c3ff898 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorConfig.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + +public class UberCompressorConfig { + + private static final Log LOG = LogFactory.getLog(UberCompressorConfig.class); + + private StandardStructObjectInspector objectInspector; + private SerDeParameters serdeParams; + private int size; + + UberCompressorConfig(Configuration _unused, Properties tbl) throws SerDeException { + + serdeParams = new SerDeParameters(); + LazyUtils.extractColumnInfo(tbl, serdeParams, "UberCompressor"); + + List columnTypes = serdeParams.getColumnTypes(); + size = columnTypes.size(); + LOG.info("Initializing UberCompressor for schema : " + serdeParams.getColumnNames() + "/" + + columnTypes); + List fois = new ArrayList(size); + for (TypeInfo ti : columnTypes) { + ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(ti); + fois.add(oi); + } + objectInspector = + ObjectInspectorFactory.getStandardStructObjectInspector( + serdeParams.getColumnNames(), fois); + } + + public int getNumberOfColumns() { + return size; + } + + public String getColumnName(int col) { + return serdeParams.getColumnNames().get(col); + } + + public TypeInfo getColumnType(int col) { + return serdeParams.getColumnTypes().get(col); + } + + public StandardStructObjectInspector getObjectInspector() { + return objectInspector; + } + + private static UberCompressorConfig theInstance; + + public static UberCompressorConfig getInstance() { + return theInstance; + } + + public static void setInstance(UberCompressorConfig uberCompressorConfig) { + theInstance = uberCompressorConfig; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorSerde.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorSerde.java new file mode 100644 index 0000000..12d52a1 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorSerde.java @@ -0,0 +1,128 @@ +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; +import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.Writable; + +public class UberCompressorSerde implements SerDe { + + @SuppressWarnings("unused") + private static final Log LOG = LogFactory.getLog(UberCompressorSerde.class); + UberCompressorConfig uberCompressorConfig; + + // serialization related + BytesRefArrayWritable serializeCache; + BytesRefWritable field[]; + ByteStream.Output byteStream = new ByteStream.Output(); + DataOutputStream serializeStream = new DataOutputStream(byteStream); + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + uberCompressorConfig = new UberCompressorConfig(conf, tbl); + int size = uberCompressorConfig.getNumberOfColumns(); + serializeCache = new BytesRefArrayWritable(size); + field = new BytesRefWritable[size]; + for (int i = 0; i < size; i++) { + field[i] = new BytesRefWritable(); + serializeCache.set(i, field[i]); + } + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + if (UberCompressorConfig.getInstance() == null) { + UberCompressorConfig.setInstance(uberCompressorConfig); + } + if (objInspector.getCategory() != Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only serialize struct types, but we got: " + objInspector.getTypeName()); + } + + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + + byteStream.reset(); + int streamOffset = 0; + // Serialize each field + for (int i = 0; i < fields.size(); i++) { + ObjectInspector foi = fields.get(i).getFieldObjectInspector(); + Object f = (list == null ? null : list.get(i)); + try { + if (f != null) { + UberCompressorUtils.writeObject(f, foi, serializeStream); + serializeStream.flush(); + field[i].set(byteStream.getData(), streamOffset, byteStream.getCount() - streamOffset); + streamOffset = byteStream.getCount(); + } else { + field[i].set(byteStream.getData(), streamOffset, 0); + } + } catch (IOException e) { + throw new SerDeException(e); + } + } + return serializeCache; + } + + @Override + public Object deserialize(Writable blob) throws SerDeException { + if (!(blob instanceof BytesRefArrayWritable)) { + throw new SerDeException(getClass().toString() + ": expects BytesRefArrayWritable!"); + } + try { + BytesRefArrayWritable cols = (BytesRefArrayWritable) blob; + StandardStructObjectInspector soi = (StandardStructObjectInspector) getObjectInspector(); + List fields = soi.getAllStructFieldRefs(); + Object struct = soi.create(); + for (int index = 0; index < fields.size(); ++index) { + StructField sf = fields.get(index); + BytesRefWritable brw = cols.get(index); + ByteArrayInputStream bais = new ByteArrayInputStream(brw.getData(), brw.getStart(), + brw.getLength()); + DataInput dataIn = new DataInputStream(bais); + Object f = UberCompressorUtils.readObject(index, sf.getFieldObjectInspector(), dataIn); + soi.setStructFieldData(struct, sf, f); + } + return struct; + } catch (IOException e) { + throw new SerDeException(e); + } + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return uberCompressorConfig.getObjectInspector(); + } + + @Override + public SerDeStats getSerDeStats() { + //FIXME + return null; + } + + @Override + public Class getSerializedClass() { + return BytesRefArrayWritable.class; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorUtils.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorUtils.java new file mode 100644 index 0000000..c15698b --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorUtils.java @@ -0,0 +1,131 @@ +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.contrib.ubercompressor.dsalg.Tuple; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyArray; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyListObjectInspector; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + + +public class UberCompressorUtils { + + private static final Log LOG = LogFactory.getLog(UberCompressorUtils.class); + + private UberCompressorUtils() { + } + + public static void writeInt(OutputStream byteStream, int v) throws IOException { + byteStream.write((v >>> 24) & 0xFF); + byteStream.write((v >>> 16) & 0xFF); + byteStream.write((v >>> 8) & 0xFF); + byteStream.write((v >>> 0) & 0xFF); + } + + public static int readInt(InputStream byteStream) throws IOException { + int rc = 0; + for (int j = 0; j < 4; ++j) { + int b = byteStream.read(); + if (b == -1) { + throw new EOFException(); + } + rc = (rc << 8) | (b & 0xFF); + } + return rc; + } + + + private static SerDeParameters serdeParams; + private static ByteStream.Output bout = new ByteStream.Output(); + private static ByteArrayRef bar = new ByteArrayRef(); + + public static void writeObject(Object obj, ObjectInspector objInspector, DataOutput out) + throws IOException { + init(); + if (obj == null) { // nulls are handled via a 0-length value + return; + } + bout.reset(); + LazySimpleSerDe.serialize(bout, obj, objInspector, serdeParams.getSeparators(), 1, + serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar(), + serdeParams.getNeedsEscape()); + bout.write(serdeParams.getSeparators()[0]); + out.write(bout.getData(), 0, bout.getCount()); + } + + private static void init() throws IOException { + if (serdeParams == null) { + Properties props = new Properties(); + props.put(Constants.ESCAPE_CHAR, "\n"); + try { + serdeParams = LazySimpleSerDe.initSerdeParams(null, props, "uber"); + } catch (SerDeException e) { + throw new IOException(e); + } + } + } + + static Map, Converter>> cachedLazyObjects = + new HashMap, Converter>>(); + + private static Tuple, Converter> getCachedTuple(int colNo, ObjectInspector oi) { + Tuple, Converter> rc = cachedLazyObjects.get(colNo); + if (rc == null) { + ObjectInspector lazyOI = LazyFactory.createLazyObjectInspector( + TypeInfoUtils.getTypeInfoFromObjectInspector(oi), + serdeParams.getSeparators(), 1, serdeParams.getNullSequence(), serdeParams.isEscaped(), + serdeParams.getEscapeChar()); + LazyObject lo = LazyFactory.createLazyObject(lazyOI); + Converter co = ObjectInspectorConverters.getConverter(lazyOI, oi); + rc = new Tuple, Converter>(lo, co); + cachedLazyObjects.put(colNo, rc); + } + return rc; + } + + public static Object readObject(int columnNo, ObjectInspector objInspector, DataInput in) + throws IOException { + try { + init(); + byte sep = serdeParams.getSeparators()[0]; + bout.reset(); + byte b = in.readByte(); + while (b != sep) { + bout.write(b); + b = in.readByte(); + } + bar.setData(bout.getData()); + Tuple, Converter> loTuple = getCachedTuple(columnNo, objInspector); + loTuple.getFirst().init(bar, 0, bout.getCount()); + Object stdO = loTuple.getSecond().convert(loTuple.getFirst()); + return stdO; + } catch (EOFException e) { + return null; + } + } +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/compressors/DummyIntegerCompressor.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/compressors/DummyIntegerCompressor.java new file mode 100644 index 0000000..8b2e807 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/compressors/DummyIntegerCompressor.java @@ -0,0 +1,75 @@ +package org.apache.hadoop.hive.contrib.ubercompressor.compressors; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hive.contrib.ubercompressor.InputReader; +import org.apache.hadoop.hive.contrib.ubercompressor.OutputWriter; +import org.apache.hadoop.hive.contrib.ubercompressor.TypeSpecificCompressor; +import org.apache.hadoop.hive.contrib.ubercompressor.UberCompressorUtils; +import org.apache.hadoop.hive.contrib.ubercompressor.dsalg.Tuple; + +public class DummyIntegerCompressor implements TypeSpecificCompressor { + + private static class MyOutputWriter implements OutputWriter { + private OutputStream mOut; + + MyOutputWriter(OutputStream out) { + mOut = out; + } + + @Override + public void write(Integer t) throws IOException { + UberCompressorUtils.writeInt(mOut, t); + } + + @Override + public long close(boolean closeUnderlying) throws IOException { + if (closeUnderlying) { + mOut.close(); + } else { + mOut.flush(); + } + return -1; + } + } + + private static class MyInputReader implements InputReader { + + private InputStream mIn; + + public MyInputReader(InputStream in) { + mIn = in; + } + + @Override + public Tuple read() throws IOException { + try { + return new Tuple(false, UberCompressorUtils.readInt(mIn)); + } catch (EOFException e) { + return new Tuple(true, 0); + } + } + + @Override + public void close(boolean closeUnderLying) throws IOException { + if (closeUnderLying) { + mIn.close(); + } + } + + } + + @Override + public OutputWriter createOutputWriter(OutputStream out) throws IOException { + return new MyOutputWriter(out); + } + + @Override + public InputReader createInputReader(InputStream in, long numBits) throws IOException { + return new MyInputReader(in); + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/dsalg/Tuple.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/dsalg/Tuple.java new file mode 100644 index 0000000..5275f36 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/dsalg/Tuple.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor.dsalg; + +/** + * + * Tuple. A simple pair class + * + * @param + * @param + */ +public class Tuple { + private T1 mT1; + private T2 mT2; + + public Tuple(T1 t1, T2 t2) { + mT1 = t1; + mT2 = t2; + } + + public T1 getFirst() { + return mT1; + } + + public T2 getSecond() { + return mT2; + } + + public void setFirst(T1 t1) { + mT1 = t1; + } + + public void setSecond(T2 t2) { + mT2 = t2; + } +} diff --git contrib/src/test/queries/clientpositive/ubercompressor.q contrib/src/test/queries/clientpositive/ubercompressor.q new file mode 100644 index 0000000..80f636f --- /dev/null +++ contrib/src/test/queries/clientpositive/ubercompressor.q @@ -0,0 +1,19 @@ +ADD JAR ${system:build.dir}/hive-contrib-${system:hive.version}.jar; + +set hive.exec.compress.output=true; +set mapred.output.compression.codec=org.apache.hadoop.hive.contrib.ubercompressor.UberCompressionCodec; +set hive.ubercompressor.config.column.1=codec:org.apache.hadoop.io.compress.GzipCodec; +set hive.ubercompressor.config.column.2=codec:org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; +set hive.ubercompressor.config.column.3=compressor:org.apache.hadoop.hive.contrib.ubercompressor.compressors.DummyIntegerCompressor; +set hive.ubercompressor.config.column.4=codec:org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; + +CREATE TABLE table_ubercompressor(a array, b array, c map, d int, e string) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.contrib.ubercompressor.UberCompressorSerde' +STORED AS RCFILE; + +FROM src_thrift +INSERT OVERWRITE TABLE table_ubercompressor SELECT src_thrift.lint, src_thrift.lstring, src_thrift.mstringstring, src_thrift.aint, src_thrift.astring; + +SELECT * FROM table_ubercompressor; + diff --git contrib/src/test/results/clientpositive/ubercompressor.q.out contrib/src/test/results/clientpositive/ubercompressor.q.out new file mode 100644 index 0000000..feb0985 --- /dev/null +++ contrib/src/test/results/clientpositive/ubercompressor.q.out @@ -0,0 +1,50 @@ +PREHOOK: query: CREATE TABLE table_ubercompressor(a array, b array, c map, d int, e string) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.contrib.ubercompressor.UberCompressorSerde' +STORED AS RCFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE table_ubercompressor(a array, b array, c map, d int, e string) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.contrib.ubercompressor.UberCompressorSerde' +STORED AS RCFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_ubercompressor +PREHOOK: query: FROM src_thrift +INSERT OVERWRITE TABLE table_ubercompressor SELECT src_thrift.lint, src_thrift.lstring, src_thrift.mstringstring, src_thrift.aint, src_thrift.astring +PREHOOK: type: QUERY +PREHOOK: Input: default@src_thrift +PREHOOK: Output: default@table_ubercompressor +POSTHOOK: query: FROM src_thrift +INSERT OVERWRITE TABLE table_ubercompressor SELECT src_thrift.lint, src_thrift.lstring, src_thrift.mstringstring, src_thrift.aint, src_thrift.astring +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src_thrift +POSTHOOK: Output: default@table_ubercompressor +POSTHOOK: Lineage: table_ubercompressor.a SIMPLE [(src_thrift)src_thrift.FieldSchema(name:lint, type:array, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.b SIMPLE [(src_thrift)src_thrift.FieldSchema(name:lstring, type:array, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.c SIMPLE [(src_thrift)src_thrift.FieldSchema(name:mstringstring, type:map, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.d SIMPLE [(src_thrift)src_thrift.FieldSchema(name:aint, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.e SIMPLE [(src_thrift)src_thrift.FieldSchema(name:astring, type:string, comment:from deserializer), ] +PREHOOK: query: SELECT * FROM table_ubercompressor +PREHOOK: type: QUERY +PREHOOK: Input: default@table_ubercompressor +PREHOOK: Output: file:/var/folders/67/67R3POPtF90VG63KSmCbcU++F0U/-Tmp-/krishnak/hive_2011-12-08_01-19-21_716_5568709994192411459/-mr-10000 +POSTHOOK: query: SELECT * FROM table_ubercompressor +POSTHOOK: type: QUERY +POSTHOOK: Input: default@table_ubercompressor +POSTHOOK: Output: file:/var/folders/67/67R3POPtF90VG63KSmCbcU++F0U/-Tmp-/krishnak/hive_2011-12-08_01-19-21_716_5568709994192411459/-mr-10000 +POSTHOOK: Lineage: table_ubercompressor.a SIMPLE [(src_thrift)src_thrift.FieldSchema(name:lint, type:array, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.b SIMPLE [(src_thrift)src_thrift.FieldSchema(name:lstring, type:array, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.c SIMPLE [(src_thrift)src_thrift.FieldSchema(name:mstringstring, type:map, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.d SIMPLE [(src_thrift)src_thrift.FieldSchema(name:aint, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.e SIMPLE [(src_thrift)src_thrift.FieldSchema(name:astring, type:string, comment:from deserializer), ] +[0,0,0] ["0","0","0"] {"key_9":"value_9"} 1712634731 record_0 +[1,2,3] ["10","100","1000"] {"key_9":"value_9"} 465985200 record_1 +[2,4,6] ["20","200","2000"] {"key_9":"value_9"} -751827638 record_2 +[3,6,9] ["30","300","3000"] {"key_9":"value_9"} 477111222 record_3 +[4,8,12] ["40","400","4000"] {"key_9":"value_9"} -734328909 record_4 +[5,10,15] ["50","500","5000"] {"key_9":"value_9"} -1952710710 record_5 +[6,12,18] ["60","600","6000"] {"key_9":"value_9"} 1244525190 record_6 +[7,14,21] ["70","700","7000"] {"key_9":"value_9"} -1461153973 record_7 +[8,16,24] ["80","800","8000"] {"key_9":"value_9"} 1638581578 record_8 +[9,18,27] ["90","900","9000"] {"key_9":"value_9"} 336964413 record_9 +null null null 0 NULL