diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/InputReader.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/InputReader.java new file mode 100644 index 0000000..de620a9 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/InputReader.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.IOException; + +import org.apache.hadoop.hive.contrib.ubercompressor.dsalg.Tuple; + +/** + * + * InputReader. a reader for instances of type T + * + * @param the type + */ +public interface InputReader { + /** + * + * read in an instance. + * + * @return a tuple indicating whether we have hit EOF, and if not, the instance read + * + * @throws IOException + */ + public Tuple read() throws IOException; + /** + * + * Close the reader + * + * @param closeUnderLying whether to close the underlying inputstream + * + * @throws IOException + */ + public void close(boolean closeUnderLying) throws IOException; +} \ No newline at end of file diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/OutputWriter.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/OutputWriter.java new file mode 100644 index 0000000..a9390e0 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/OutputWriter.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.IOException; +/** + * + * OutputWriter. A writer for instances of type T + * + * @param the type + */ +public interface OutputWriter { + /** + * Pass an instance to be written to the outputstream. The instance may be cached internally and + * may be actually written at some future time, but will be written by the time close returns. + * Null values can be legally written into the stream. + * + * @param t the instance + * @throws IOException + */ + public void write(T t) throws IOException; + /** + * Close the writer. The writer will write and release any cached instances when closed. + * + * @param closeUnderlying propogate the close to the underlying stream + * @return the number of bits written, -1 if unknown + * @throws IOException + */ + public long close(boolean closeUnderlying) throws IOException; +} \ No newline at end of file diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/TypeSpecificCompressor.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/TypeSpecificCompressor.java new file mode 100644 index 0000000..c4d908b --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/TypeSpecificCompressor.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * + * The interface for a type-specific compressor + * @param the type + */ +public interface TypeSpecificCompressor { + + /** + * Create a writer for a stream of this type instances + * + * @param out the outputstream to write into + * @return the writer instance + * @throws IOException + */ + public OutputWriter createOutputWriter(OutputStream out) throws IOException; + + /** + * + * Create a reader for a stream of this type instances + * + * @param in the inputstream to read from + * @param numBits the number of bits of data to consume from the inputstream, -1 if unknown + * @return the reader instance + * @throws IOException + */ + public InputReader createInputReader(InputStream in, long numBits) + throws IOException; +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionCodec.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionCodec.java new file mode 100644 index 0000000..91836c3 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionCodec.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; + +public class UberCompressionCodec implements CompressionCodec, Configurable { + + @Override + public Compressor createCompressor() { + return new DummyCompressor(); + } + + private static class DummyCompressor implements Compressor { + + @Override + public void setInput(byte[] b, int off, int len) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean needsInput() { + throw new UnsupportedOperationException(); + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException(); + } + + @Override + public long getBytesRead() { + throw new UnsupportedOperationException(); + } + + @Override + public long getBytesWritten() { + throw new UnsupportedOperationException(); + } + + @Override + public void finish() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean finished() { + throw new UnsupportedOperationException(); + } + + @Override + public int compress(byte[] b, int off, int len) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + } + + @Override + public void end() { + throw new UnsupportedOperationException(); + } + } + + @Override + public Decompressor createDecompressor() { + return new DummyDecompressor(); + } + + private static class DummyDecompressor implements Decompressor { + + @Override + public void setInput(byte[] b, int off, int len) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean needsInput() { + throw new UnsupportedOperationException(); + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean needsDictionary() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean finished() { + throw new UnsupportedOperationException(); + } + + @Override + public int decompress(byte[] b, int off, int len) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + } + + @Override + public void end() { + throw new UnsupportedOperationException(); + } + + } + + @Override + public Class getCompressorType() { + return DummyCompressor.class; + } + + @Override + public Class getDecompressorType() { + return DummyDecompressor.class; + } + + @Override + public CompressionInputStream createInputStream(InputStream arg0) throws IOException { + return new UberCompressionInputStream(conf, arg0); + } + + @Override + public CompressionInputStream createInputStream(InputStream arg0, Decompressor arg1) + throws IOException { + return new UberCompressionInputStream(conf, arg0); + } + + @Override + public CompressionOutputStream createOutputStream(OutputStream arg0) throws IOException { + return new UberCompressionOutputStream(conf, arg0); + } + + @Override + public CompressionOutputStream createOutputStream(OutputStream arg0, Compressor arg1) + throws IOException { + return new UberCompressionOutputStream(conf, arg0); + } + + @Override + public String getDefaultExtension() { + return ".uz"; + } + + private Configuration conf; + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration arg0) { + conf = arg0; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionInputStream.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionInputStream.java new file mode 100644 index 0000000..ee8f712 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionInputStream.java @@ -0,0 +1,246 @@ +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.SchemaAwareCompressionInputStream; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; + +public class UberCompressionInputStream extends SchemaAwareCompressionInputStream { + + private static final Log LOG = LogFactory.getLog(UberCompressionInputStream.class); + + private int mColumnNumber = -1; + + ByteStream.Output byteOut; + private byte[] buf; + + public UberCompressionInputStream(Configuration conf, InputStream arg0) throws IOException { + super(arg0); + } + + @Override + public void setColumnIndex(int columnIndex) { + mColumnNumber = columnIndex; + } + + @Override + public void resetState() throws IOException { + } + + @Override + public int read(byte[] data, int offset, int length) throws IOException { + LOG.info("Reading " + length + " bytes for colindex " + mColumnNumber); + try { + if (mColumnNumber == -1) { + UberCompressorColumnConfig defaultConfig = new UberCompressorColumnConfig(); + CompressionCodec codec = defaultConfig.getCodec(); + CompressionInputStream cin = codec.createInputStream(in); + cin.resetState(); + int read = 0; + while (read < length) { + int read_now = cin.read(data, offset + read, length - read); + if (read_now < 0) { + throw new EOFException("Trying to read " + length + + " bytes from the compressed stream, got only " + read + " bytes."); + } else { + read += read_now; + } + } + return length; + } + DataInput dataIn = new DataInputStream(in); + String tis = dataIn.readUTF(); + String colcon = dataIn.readUTF(); + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(tis); + UberCompressorColumnConfig columnConfig = new UberCompressorColumnConfig(typeInfo, colcon); + if (byteOut == null) { + byteOut = new ByteStream.Output(); + } else { + byteOut.reset(); + } + TypeSpecificCompressor compressor = columnConfig.getCompressor(); + if (compressor != null) { + ObjectInspector foi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo); + readFromCompressor(compressor, foi, typeInfo, length); + System.arraycopy(byteOut.getData(), 0, data, 0, length); + return length; + } else { + SerDe serde = columnConfig.getSerde(); + if (serde != null) { + int serializedLength = dataIn.readInt(); + enlargeBuffer(serializedLength); + CompressionCodec codec = columnConfig.getCodec(); + CompressionInputStream cin = codec.createInputStream(in); + cin.resetState(); + cin.read(buf, 0, serializedLength); + DataInput din = new DataInputStream(new ByteArrayInputStream(buf, 0, serializedLength)); + Writable w = serde.getSerializedClass().newInstance(); + w.readFields(din); + Object o = serde.deserialize(w); + DataOutputStream byteStream = new DataOutputStream(byteOut); + StructObjectInspector soi = (StructObjectInspector) serde.getObjectInspector(); + Object list = soi.getStructFieldsDataAsList(o).get(0); + ListObjectInspector loi = (ListObjectInspector) soi.getAllStructFieldRefs().get(0) + .getFieldObjectInspector(); + ObjectInspector oi = loi.getListElementObjectInspector(); + for (Object element : loi.getList(list)) { + UberCompressorUtils.writeObject(element, oi, byteStream); + } + byteStream.close(); + System.arraycopy(byteOut.getData(), 0, data, 0, length); + return length; + } else { + CompressionCodec codec = columnConfig.getCodec(); + CompressionInputStream cin = codec.createInputStream(in); + cin.resetState(); + int read = 0; + while (read < length) { + int read_now = cin.read(data, offset + read, length - read); + if (read_now < 0) { + throw new EOFException("Trying to read " + length + + " bytes from the compressed stream, got only " + read + " bytes."); + } else { + read += read_now; + } + } + return length; + } + } + } catch (SerDeException e) { + throw new IOException(e); + } catch (InstantiationException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + } + + private void readFromCompressor(TypeSpecificCompressor compressor, ObjectInspector foi, + TypeInfo typeInfo, int length) throws IOException { + DataOutputStream dataOut = new DataOutputStream(byteOut); + InputReader ir = compressor.createInputReader(in, -1); + if (typeInfo instanceof PrimitiveTypeInfo) { + PrimitiveTypeInfo pcolTypeInfo = (PrimitiveTypeInfo) typeInfo; + switch (pcolTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case INT: + case SHORT: { + @SuppressWarnings("unchecked") + InputReader iir = (InputReader) ir; + while (byteOut.getCount() < length) { + Integer i = iir.read().getSecond(); + UberCompressorUtils.writeObject(i, foi, dataOut); + dataOut.flush(); + } + break; + } + case STRING: { + @SuppressWarnings("unchecked") + InputReader iir = (InputReader) ir; + while (byteOut.getCount() < length) { + String i = iir.read().getSecond(); + UberCompressorUtils.writeObject(i, foi, dataOut); + dataOut.flush(); + } + break; + } + default: { + throw new RuntimeException("Unsupported type: " + pcolTypeInfo); + } + } + } else { + if (typeInfo instanceof MapTypeInfo) { + @SuppressWarnings("unchecked") + InputReader> mapReader = (InputReader>) ir; + while (byteOut.getCount() < length) { + Map i = mapReader.read().getSecond(); + UberCompressorUtils.writeObject(i, foi, dataOut); + dataOut.flush(); + } + } else { + if (typeInfo instanceof ListTypeInfo) { + @SuppressWarnings("unchecked") + InputReader>> listmapReader = + (InputReader>>) ir; + while (byteOut.getCount() < length) { + List> i = listmapReader.read().getSecond(); + UberCompressorUtils.writeObject(i, foi, dataOut); + dataOut.flush(); + } + } else { + throw new RuntimeException("Unsupported type: " + typeInfo); + } + } + } + } + + private void enlargeBuffer(int length) { + if ((buf == null) || buf.length < length) { + buf = new byte[length]; + } + } + + @Override + public int read() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public long skip(long n) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int available() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized void mark(int readlimit) { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized void reset() throws IOException { + } + + @Override + public boolean markSupported() { + return false; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionOutputStream.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionOutputStream.java new file mode 100644 index 0000000..9e3482d --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionOutputStream.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.SchemaAwareCompressionOutputStream; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionOutputStream; + +public class UberCompressionOutputStream extends SchemaAwareCompressionOutputStream { + + @SuppressWarnings("unused") + private static final Log LOG = LogFactory.getLog(UberCompressionOutputStream.class); + + private CompressionOutputStream mCos; + private SerDe mSerde; + private ByteStream.Output byteOut; + private OutputWriter mOW; + + private int mColumnNumber = -1; + private boolean resetPending = true; + + public UberCompressionOutputStream(Configuration conf, OutputStream arg0) throws IOException { + super(arg0); + byteOut = new ByteStream.Output(); + } + + @Override + public void finish() throws IOException { + if (mColumnNumber != -1) { + doWrite(); + } + if (mOW != null) { + mOW.close(false); + } else { + if (mCos != null) { + mCos.finish(); + } + } + } + + @Override + public void resetState() throws IOException { + resetPending = true; + } + + @Override + public void setColumnIndex(int columnIndex) { + if (mColumnNumber != columnIndex) { + mOW = null; + mCos = null; + mSerde = null; + resetPending = true; + mColumnNumber = columnIndex; + } + } + + private void doReset() throws IOException { + UberCompressorConfig uberCompressorConfig = UberCompressorConfig.getInstance(); + UberCompressorColumnConfig header = uberCompressorConfig.getUberColumnConfig(mColumnNumber); + TypeSpecificCompressor compressor = header.getCompressor(); + if (compressor != null) { + mOW = compressor.createOutputWriter(out); + } else { + if (mCos != null) { + mCos.resetState(); + } else { + CompressionCodec codec = header.getCodec(); + mCos = codec.createOutputStream(out); + mSerde = header.getSerde(); + } + } + byteOut.reset(); + resetPending = false; + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + if (resetPending) { + doReset(); + } + if (mColumnNumber == -1) { + mCos.write(data, offset, length); + return; + } + byteOut.write(data, offset, length); + } + + private void doWrite() throws IOException { + try { + UberCompressorConfig uberCompressorConfig = UberCompressorConfig.getInstance(); + UberCompressorColumnConfig header = uberCompressorConfig.getUberColumnConfig(mColumnNumber); + TypeInfo typeInfo = header.getColumnType(); + String tis = typeInfo.getTypeName(); + DataOutputStream dataOutput = new DataOutputStream(out); + dataOutput.writeUTF(tis); + String colcon = header.getColumnConfig(); + dataOutput.writeUTF(colcon); + dataOutput.flush(); + byte[] data = byteOut.getData(); + int length = byteOut.getCount(); + if (length == 0) { + return; // nothing to do + } else { + ObjectInspector foi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo); + DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data, 0, length)); + if (mOW != null) { + writeToCompressor(dataIn, foi, typeInfo); + } else { + if (mSerde != null) { + Object f = UberCompressorUtils.readObject(foi, dataIn); + ArrayList list = new ArrayList(); + while (f != null) { + list.add(f); + f = UberCompressorUtils.readObject(foi, dataIn); + } + StandardListObjectInspector loi = + ObjectInspectorFactory.getStandardListObjectInspector(foi); + Object o = loi.create(list.size()); + for (int index = 0; index < list.size(); index++) { + loi.set(o, index, list.get(index)); + } + StandardStructObjectInspector soi = header.getTypeInfoForSerde(); + Object s = soi.create(); + soi.setStructFieldData(s, soi.getAllStructFieldRefs().get(0), o); + Writable w = mSerde.serialize(s, soi); + byteOut.reset(); + DataOutput serialized = new DataOutputStream(byteOut); + w.write(serialized); + dataOutput.writeInt(byteOut.getCount()); + dataOutput.flush(); + mCos.resetState(); + mCos.write(byteOut.getData(), 0, byteOut.getCount()); + mCos.finish(); + } else { + mCos.write(data, 0, length); + } + } + } + byteOut.reset(); + } catch (SerDeException e) { + throw new IOException(e); + } + } + + @SuppressWarnings("unchecked") + private void writeToCompressor(DataInput dataIn, ObjectInspector foi, TypeInfo colTypeInfo) + throws IOException { + if (colTypeInfo instanceof PrimitiveTypeInfo) { + PrimitiveTypeInfo pcolTypeInfo = (PrimitiveTypeInfo) colTypeInfo; + switch (pcolTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case INT: + case SHORT: { + OutputWriter writer = (OutputWriter) mOW; + switch (pcolTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: { + Object f = UberCompressorUtils.readObject(foi, dataIn); + while (f != null) { + boolean v = (Boolean) f; + writer.write(v ? 1 : 0); + f = UberCompressorUtils.readObject(foi, dataIn); + } + return; + } + case BYTE: { + Object f = UberCompressorUtils.readObject(foi, dataIn); + while (f != null) { + byte v = (Byte) f; + writer.write((int) v); + f = UberCompressorUtils.readObject(foi, dataIn); + } + return; + } + case SHORT: { + Object f = UberCompressorUtils.readObject(foi, dataIn); + while (f != null) { + short v = (Short) f; + writer.write((int) v); + f = UberCompressorUtils.readObject(foi, dataIn); + } + return; + } + case INT: { + Object f = UberCompressorUtils.readObject(foi, dataIn); + while (f != null) { + int v = (Integer) f; + writer.write(v); + f = UberCompressorUtils.readObject(foi, dataIn); + } + return; + } + default: { + throw new RuntimeException("Unrecognized type: " + pcolTypeInfo.getPrimitiveCategory()); + } + } + } + case STRING: { + OutputWriter writer = (OutputWriter) mOW; + Object f = UberCompressorUtils.readObject(foi, dataIn); + while (f != null) { + String v = (String) f; + writer.write(v); + f = UberCompressorUtils.readObject(foi, dataIn); + } + return; + } + default: + throw new RuntimeException("Unrecognized type: " + pcolTypeInfo); + } + } else { + if (colTypeInfo instanceof MapTypeInfo) { + // currently support map of string/string only + OutputWriter> writer = (OutputWriter>) mOW; + Object f = UberCompressorUtils.readObject(foi, dataIn); + while (f != null) { + writer.write((Map) f); + f = UberCompressorUtils.readObject(foi, dataIn); + } + } else { + if (colTypeInfo instanceof ListTypeInfo) { + // currently support list of map of string/string only + OutputWriter>> writer = + (OutputWriter>>) mOW; + Object f = UberCompressorUtils.readObject(foi, dataIn); + while (f != null) { + writer.write((List>) f); + f = UberCompressorUtils.readObject(foi, dataIn); + } + } else { + throw new RuntimeException("Unsupported type for compressor:" + colTypeInfo); + } + } + } + } + + @Override + public void write(int b) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorColumnConfig.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorColumnConfig.java new file mode 100644 index 0000000..c8d2fed --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorColumnConfig.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CompressionCodec; + +public class UberCompressorColumnConfig { + + @SuppressWarnings("unused") + private static final Log LOG = LogFactory.getLog(UberCompressorColumnConfig.class); + + private String name; + private String columnConfig; + private TypeInfo typeInfo; + + private TypeSpecificCompressor compressor; + private CompressionCodec codec; + private SerDe serde; + private StandardStructObjectInspector structOfListOfItems; + + public UberCompressorColumnConfig() throws SerDeException { + this(null); + } + + public UberCompressorColumnConfig(TypeInfo ti) throws SerDeException { + this(ti, ""); + } + + public UberCompressorColumnConfig(String name, TypeInfo ti) throws SerDeException { + this(name, ti, ""); + } + + UberCompressorColumnConfig(TypeInfo typeInfo, String columnConfig) throws SerDeException { + this("unknown", typeInfo, columnConfig); + } + + public UberCompressorColumnConfig(String name, TypeInfo ti, String columnConfig) + throws SerDeException { + this.name = name; + this.columnConfig = columnConfig; + this.typeInfo = ti; + initializeColumnConfig(); + } + + private void initializeColumnConfig() + throws SerDeException { + try { + if (columnConfig.startsWith("compressor:")) { + String compressorName = columnConfig.substring(11); + compressor = (TypeSpecificCompressor) Class.forName(compressorName).newInstance(); + } else { + if (columnConfig.startsWith("codec:")) { + String codecName = columnConfig.substring(6); + int ci = codecName.indexOf(','); + if (ci != -1) { + String serdeName = codecName.substring(ci+1); + serde = (SerDe) Class.forName(serdeName).newInstance(); + Properties tbl = new Properties(); + TypeInfo list = TypeInfoFactory.getListTypeInfo(typeInfo); + List names = Arrays.asList("listfield"); + List typeInfos = Arrays.asList(list); + TypeInfo struct = TypeInfoFactory.getStructTypeInfo(names, typeInfos); + structOfListOfItems = (StandardStructObjectInspector) + TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(struct); + tbl.setProperty(Constants.LIST_COLUMNS, "listOf" + name); + tbl.setProperty(Constants.LIST_COLUMN_TYPES, list.getTypeName()); + tbl.put(Constants.ESCAPE_CHAR, "\n"); // For lazysimpleserde + serde.initialize(null, tbl); + codecName = codecName.substring(0, ci); + } + codec = (CompressionCodec) Class.forName(codecName).newInstance(); + } else { + if (columnConfig.isEmpty()) { + columnConfig = "codec:org.apache.hadoop.io.compress.BZip2Codec"; + codec = new BZip2Codec(); + } else { + throw new SerDeException("Unsupported type for compressor: " + columnConfig); + } + } + } + } catch (ClassNotFoundException e) { + throw new SerDeException(e); + } catch (IllegalAccessException e) { + throw new SerDeException(e); + } catch (InstantiationException e) { + throw new SerDeException(e); + } + } + + public TypeInfo getColumnType() { + return typeInfo; + } + + public String getColumnConfig() { + return columnConfig; + } + + @Override + public String toString() { + return "UberCompressorColumnConfig [name=" + name + + ", columnConfig=" + columnConfig + + ", typeInfo=" + typeInfo + + "]"; + } + + public TypeSpecificCompressor getCompressor() { + return compressor; + } + + public CompressionCodec getCodec() { + return codec; + } + + public SerDe getSerde() { + return serde; + } + + public StandardStructObjectInspector getTypeInfoForSerde() { + return structOfListOfItems; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorConfig.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorConfig.java new file mode 100644 index 0000000..8d46b16 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorConfig.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + +public class UberCompressorConfig { + + private static final Log LOG = LogFactory.getLog(UberCompressorConfig.class); + + private static final String COMPRESSION_CONF = "hive.ubercompressor.config.column."; + + private UberCompressorColumnConfig[] columnConfigs; + private StandardStructObjectInspector objectInspector; + + private UberCompressorColumnConfig defaultconfig; + + UberCompressorConfig(Configuration pConf, Properties tbl) throws SerDeException { + + SerDeParameters serdeParams = new SerDeParameters(); + LazyUtils.extractColumnInfo(tbl, serdeParams, "UberCompressor"); + LOG.info("Initializing UberCompressor for schema : " + serdeParams.getColumnNames() + "/" + + serdeParams.getColumnTypes()); + + int size = serdeParams.getColumnNames().size(); + columnConfigs = new UberCompressorColumnConfig[size]; + for (int i = 0; i < size; ++i) { + TypeInfo ti = serdeParams.getColumnTypes().get(i); + if (pConf != null) { + String columnConfig = pConf.get(COMPRESSION_CONF + i); + if (columnConfig != null) { + columnConfigs[i] = new UberCompressorColumnConfig(serdeParams.getColumnNames().get(i), + ti, columnConfig); + } else { + columnConfigs[i] = new UberCompressorColumnConfig(serdeParams.getColumnNames().get(i),ti); + } + } else { + columnConfigs[i] = new UberCompressorColumnConfig(ti); + } + LOG.info("UberCompressor config for column[" + i + "] :" + columnConfigs[i]); + } + defaultconfig = new UberCompressorColumnConfig(); + + List fois = new ArrayList(size); + for (TypeInfo ti : serdeParams.getColumnTypes()) { + ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(ti); + fois.add(oi); + } + + objectInspector = + ObjectInspectorFactory.getStandardStructObjectInspector( + serdeParams.getColumnNames(), fois); + } + + public UberCompressorColumnConfig getUberColumnConfig(int colIndex) { + if ((colIndex >= 0) && (colIndex < columnConfigs.length)) { + return columnConfigs[colIndex]; + } else { + return defaultconfig; + } + } + + public int getNumberOfColumns() { + return columnConfigs.length; + } + + public TypeInfo getColumnType(int colIndex) { + return columnConfigs[colIndex].getColumnType(); + } + + public String getColumnConfig(int colIndex) { + return columnConfigs[colIndex].getColumnConfig(); + } + + private static UberCompressorConfig theInstance; + + public static UberCompressorConfig getInstance() { + return theInstance; + } + + public static void setInstance(UberCompressorConfig uberCompressorConfig) { + theInstance = uberCompressorConfig; + } + + public StandardStructObjectInspector getObjectInspector() { + return objectInspector; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorSerde.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorSerde.java new file mode 100644 index 0000000..73a86f9 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorSerde.java @@ -0,0 +1,128 @@ +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; +import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.Writable; + +public class UberCompressorSerde implements SerDe { + + @SuppressWarnings("unused") + private static final Log LOG = LogFactory.getLog(UberCompressorSerde.class); + UberCompressorConfig uberCompressorConfig; + + // serialization related + BytesRefArrayWritable serializeCache; + BytesRefWritable field[]; + ByteStream.Output byteStream = new ByteStream.Output(); + DataOutputStream serializeStream = new DataOutputStream(byteStream); + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + uberCompressorConfig = new UberCompressorConfig(conf, tbl); + int size = uberCompressorConfig.getNumberOfColumns(); + serializeCache = new BytesRefArrayWritable(size); + field = new BytesRefWritable[size]; + for (int i = 0; i < size; i++) { + field[i] = new BytesRefWritable(); + serializeCache.set(i, field[i]); + } + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + if (UberCompressorConfig.getInstance() == null) { + UberCompressorConfig.setInstance(uberCompressorConfig); + } + if (objInspector.getCategory() != Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only serialize struct types, but we got: " + objInspector.getTypeName()); + } + + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + + byteStream.reset(); + int streamOffset = 0; + // Serialize each field + for (int i = 0; i < fields.size(); i++) { + ObjectInspector foi = fields.get(i).getFieldObjectInspector(); + Object f = (list == null ? null : list.get(i)); + try { + if (f != null) { + UberCompressorUtils.writeObject(f, foi, serializeStream); + serializeStream.flush(); + field[i].set(byteStream.getData(), streamOffset, byteStream.getCount() - streamOffset); + streamOffset = byteStream.getCount(); + } else { + field[i].set(byteStream.getData(), streamOffset, 0); + } + } catch (IOException e) { + throw new SerDeException(e); + } + } + return serializeCache; + } + + @Override + public Object deserialize(Writable blob) throws SerDeException { + if (!(blob instanceof BytesRefArrayWritable)) { + throw new SerDeException(getClass().toString() + ": expects BytesRefArrayWritable!"); + } + try { + BytesRefArrayWritable cols = (BytesRefArrayWritable) blob; + StandardStructObjectInspector soi = (StandardStructObjectInspector) getObjectInspector(); + List fields = soi.getAllStructFieldRefs(); + Object struct = soi.create(); + for (int index = 0; index < fields.size(); ++index) { + StructField sf = fields.get(index); + BytesRefWritable brw = cols.get(index); + ByteArrayInputStream bais = new ByteArrayInputStream(brw.getData(), brw.getStart(), + brw.getLength()); + DataInput dataIn = new DataInputStream(bais); + Object f = UberCompressorUtils.readObject(sf.getFieldObjectInspector(), dataIn); + soi.setStructFieldData(struct, sf, f); + } + return struct; + } catch (IOException e) { + throw new SerDeException(e); + } + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return uberCompressorConfig.getObjectInspector(); + } + + @Override + public SerDeStats getSerDeStats() { + //FIXME + return null; + } + + @Override + public Class getSerializedClass() { + return BytesRefArrayWritable.class; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorUtils.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorUtils.java new file mode 100644 index 0000000..9a96588 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorUtils.java @@ -0,0 +1,377 @@ +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.contrib.ubercompressor.dsalg.Tuple; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyArray; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyListObjectInspector; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + + +public class UberCompressorUtils { + + private static final Log LOG = LogFactory.getLog(UberCompressorUtils.class); + + private UberCompressorUtils() { + } + + public static Tuple, ObjectInspector> deserializeLazyList(SerDeParameters serdeParams, + TypeInfo typeInfo, byte[] data, int length) { + ObjectInspector lazyElementInspector = LazyFactory.createLazyObjectInspector( + typeInfo, + serdeParams.getSeparators(), 1, serdeParams.getNullSequence(), + serdeParams.isEscaped(), serdeParams.getEscapeChar()); + LazyListObjectInspector loi = LazyObjectInspectorFactory.getLazySimpleListObjectInspector( + lazyElementInspector, + serdeParams.getSeparators()[0], serdeParams.getNullSequence(), + serdeParams.isEscaped(), serdeParams.getEscapeChar()); + LazyArray lazyArray = (LazyArray) LazyFactory.createLazyObject(loi); + ByteArrayRef bar = new ByteArrayRef(); + bar.setData(data); + lazyArray.init(bar, 0, length); + int numElements = lazyArray.getListLength(); + List rc = new ArrayList(numElements); + for (int i = 0; i < numElements; ++i) { + Object o = lazyArray.getListElementObject(i); + rc.add(o); + } + return new Tuple, ObjectInspector>(rc, lazyElementInspector); + } + + public static void writeInt(OutputStream byteStream, int v) throws IOException { + byteStream.write((v >>> 24) & 0xFF); + byteStream.write((v >>> 16) & 0xFF); + byteStream.write((v >>> 8) & 0xFF); + byteStream.write((v >>> 0) & 0xFF); + } + + public static void writeObject(Object obj, ObjectInspector objInspector, DataOutput out) + throws IOException { + if (obj == null) { // nulls are handled via a 0-length value + return; + } + switch (objInspector.getCategory()) { + case PRIMITIVE: { + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) objInspector; + switch (poi.getPrimitiveCategory()) { + case VOID: { + return; + } + case BOOLEAN: { + boolean v = ((BooleanObjectInspector) poi).get(obj); + out.writeBoolean(v); + return; + } + case BYTE: { + ByteObjectInspector boi = (ByteObjectInspector) poi; + byte v = boi.get(obj); + out.writeByte(v); + return; + } + case SHORT: { + ShortObjectInspector spoi = (ShortObjectInspector) poi; + short v = spoi.get(obj); + out.writeShort(v); + return; + } + case INT: { + IntObjectInspector ioi = (IntObjectInspector) poi; + int v = ioi.get(obj); + out.writeInt(v); + return; + } + case LONG: { + LongObjectInspector loi = (LongObjectInspector) poi; + long v = loi.get(obj); + out.writeLong(v); + return; + } + case FLOAT: { + FloatObjectInspector foi = (FloatObjectInspector) poi; + float v = foi.get(obj); + out.writeFloat(v); + return; + } + case DOUBLE: { + DoubleObjectInspector doi = (DoubleObjectInspector) poi; + double v = doi.get(obj); + out.writeDouble(v); + return; + } + case STRING: { + StringObjectInspector soi = (StringObjectInspector) poi; + String v = soi.getPrimitiveJavaObject(obj); + out.writeUTF(v); + return; + } + case TIMESTAMP: { + TimestampObjectInspector toi = (TimestampObjectInspector) poi; + TimestampWritable t = toi.getPrimitiveWritableObject(obj); + t.write((OutputStream) out); + return; + } + default: { + throw new RuntimeException("Unrecognized type: " + poi.getPrimitiveCategory()); + } + } + } + case LIST: { + ListObjectInspector loi = (ListObjectInspector) objInspector; + ObjectInspector eoi = loi.getListElementObjectInspector(); + + int size = loi.getListLength(obj); + out.writeInt(size); + + // write the null bytes + byte nullByte = 0; + for (int eid = 0; eid < size; eid++) { + // set the bit to 1 if an element is not null + if (null != loi.getListElement(obj, eid)) { + nullByte |= 1 << (eid % 8); + } + // store the byte every eight elements or + // if this is the last element + if (7 == eid % 8 || eid == size - 1) { + out.write(nullByte); + nullByte = 0; + } + } + + // write element by element from the list + for (int eid = 0; eid < size; eid++) { + writeObject(loi.getListElement(obj, eid), eoi, out); + } + return; + } + case MAP: { + MapObjectInspector moi = (MapObjectInspector) objInspector; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + Map map = moi.getMap(obj); + + // write the size of the map which is a VInt + int size = map.size(); + out.writeInt(size); + + // write the null bytes + int b = 0; + byte nullByte = 0; + for (Map.Entry entry : map.entrySet()) { + // set the bit to 1 if a key is not null + if (null != entry.getKey()) { + nullByte |= 1 << (b % 8); + } + b++; + // set the bit to 1 if a value is not null + if (null != entry.getValue()) { + nullByte |= 1 << (b % 8); + } + b++; + // write the byte to stream every 4 key-value pairs + // or if this is the last key-value pair + if (0 == b % 8 || b == size * 2) { + out.write(nullByte); + nullByte = 0; + } + } + + // write key-value pairs one by one + for (Map.Entry entry : map.entrySet()) { + writeObject(entry.getKey(), koi, out); + writeObject(entry.getValue(), voi, out); + } + + return; + } + case STRUCT: { + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + int size = fields.size(); + int lasti = 0; + byte nullByte = 0; + + for (int i = 0; i < size; i++) { + // set bit to 1 if a field is not null + if (null != soi.getStructFieldData(obj, fields.get(i))) { + nullByte |= 1 << (i % 8); + } + // write the null byte every eight elements or + // if this is the last element and serialize the + // corresponding 8 struct fields at the same time + if (7 == i % 8 || i == size - 1) { + out.write(nullByte); + for (int j = lasti; j <= i; j++) { + writeObject(soi.getStructFieldData(obj, fields.get(j)), fields.get(j) + .getFieldObjectInspector(), out); + } + lasti = i + 1; + nullByte = 0; + } + } + return; + } + default: { + throw new RuntimeException("Unrecognized type: " + objInspector.getCategory()); + } + } + } + + public static Object readObject(ObjectInspector objInspector, DataInput in) throws IOException { + try { + switch (objInspector.getCategory()) { + case PRIMITIVE: { + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) objInspector; + switch (poi.getPrimitiveCategory()) { + case VOID: { + return null; + } + case BOOLEAN: { + return in.readBoolean(); + } + case BYTE: { + return in.readByte(); + } + case SHORT: { + return in.readShort(); + } + case INT: { + return in.readInt(); + } + case LONG: { + return in.readLong(); + } + case FLOAT: { + return in.readFloat(); + } + case DOUBLE: { + return in.readDouble(); + } + case STRING: { + return in.readUTF(); + } + case TIMESTAMP: { + TimestampWritable ts = new TimestampWritable(); + ts.readFields(in); + } + default: { + throw new RuntimeException("Unrecognized type: " + poi.getPrimitiveCategory()); + } + } + } + case LIST: { + StandardListObjectInspector loi = (StandardListObjectInspector) objInspector; + ObjectInspector eoi = loi.getListElementObjectInspector(); + + int size = in.readInt(); + Object l = loi.create(size); + + int numNullBytes = (size + 7) / 8; + byte[] nullBytes = new byte[numNullBytes]; + for (int eid = 0; eid < numNullBytes; ++eid) { + nullBytes[eid] = in.readByte(); + } + + // read element by element from the list + int nullByteCur = 0; + for (int eid = 0; eid < size; eid++) { + if ((nullBytes[nullByteCur] & (1 << (eid % 8))) != 0) { + loi.set(l, eid, readObject(eoi, in)); + } else { + loi.set(l, eid, null); + } + // move onto the next null byte + if (7 == (eid % 8)) { + nullByteCur++; + } + } + return l; + } + case MAP: { + StandardMapObjectInspector moi = (StandardMapObjectInspector) objInspector; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + int size = in.readInt(); + Object m = moi.create(); + + int numNullBytes = (size * 2 + 7) / 8; + byte[] nullBytes = new byte[numNullBytes]; + for (int eid = 0; eid < numNullBytes; ++eid) { + nullBytes[eid] = in.readByte(); + } + + // read key-value pairs one by one + int nullByteCur = 0; + for (int eid = 0; eid < size; eid++) { + Object key = null; + Object value = null; + if ((nullBytes[nullByteCur] & (1 << ((eid * 2) % 8))) != 0) { + key = readObject(koi, in); + } + if ((nullBytes[nullByteCur] & (1 << ((eid * 2 + 1) % 8))) != 0) { + value = readObject(voi, in); + } + moi.put(m, key, value); + } + return m; + } + case STRUCT: { + StandardStructObjectInspector soi = (StandardStructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + int size = fields.size(); + byte nullByte = in.readByte(); + Object s = soi.create(); + for (int i = 0; i < size; i++) { + StructField field = fields.get(i); + if ((nullByte & (1 << (i % 8))) != 0) { + soi.setStructFieldData(s, field, readObject(field.getFieldObjectInspector(), in)); + } else { + soi.setStructFieldData(s, field, null); + } + if (7 == (i % 8)) { + nullByte = in.readByte(); + } + } + return s; + } + default: { + throw new RuntimeException("Unrecognized type: " + objInspector.getCategory()); + } + } + } catch (EOFException e) { + return null; + } + } +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/dsalg/Tuple.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/dsalg/Tuple.java new file mode 100644 index 0000000..5275f36 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/dsalg/Tuple.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor.dsalg; + +/** + * + * Tuple. A simple pair class + * + * @param + * @param + */ +public class Tuple { + private T1 mT1; + private T2 mT2; + + public Tuple(T1 t1, T2 t2) { + mT1 = t1; + mT2 = t2; + } + + public T1 getFirst() { + return mT1; + } + + public T2 getSecond() { + return mT2; + } + + public void setFirst(T1 t1) { + mT1 = t1; + } + + public void setSecond(T2 t2) { + mT2 = t2; + } +}