diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/InputReader.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/InputReader.java new file mode 100644 index 0000000..fdaee67 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/InputReader.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.IOException; + +import org.apache.hadoop.hive.contrib.ubercompressor.dsalg.Tuple; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * + * InputReader. a reader for instances + * + */ +public interface InputReader { + /** + * get the objectinspector to be used to inspect the returned objects + * + * @return the objectinspector to be used to inspect the returned objects + */ + public ObjectInspector getObjectInspector(); + /** + * + * read in an instance. + * + * @return a tuple indicating whether we have hit EOF, and if not, the instance read + * + * @throws IOException + */ + public Tuple read() throws IOException; + /** + * + * Close the reader + * + * @param closeUnderLying whether to close the underlying inputstream + * + * @throws IOException + */ + public void close(boolean closeUnderLying) throws IOException; +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/OutputWriter.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/OutputWriter.java new file mode 100644 index 0000000..bd800e3 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/OutputWriter.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.IOException; +/** + * + * OutputWriter. A writer for instances + * + */ +public interface OutputWriter { + /** + * Pass an instance to be written to the outputstream. The instance may be cached internally and + * may be actually written at some future time, but will be written by the time close returns. + * Null values can be legally written into the stream. + * + * @param t the instance + * @throws IOException + */ + public void write(Object t) throws IOException; + /** + * Close the writer. The writer will write and release any cached instances when closed. + * + * @param closeUnderlying propogate the close to the underlying stream + * @return the number of bits written, -1 if unknown + * @throws IOException + */ + public long close(boolean closeUnderlying) throws IOException; +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/TypeSpecificCompressor.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/TypeSpecificCompressor.java new file mode 100644 index 0000000..b0477fd --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/TypeSpecificCompressor.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * + * The interface for a type-specific compressor + * @param the type + */ +public interface TypeSpecificCompressor { + + /** + * Create a writer for a stream of this type instances + * + * @param out the outputstream to write into + * @param oi the object inspector to inspect the objects to be written + * @return the writer instance + * @throws IOException + */ + public OutputWriter createOutputWriter(OutputStream out, ObjectInspector oi) throws IOException; + + /** + * + * Create a reader for a stream of this type instances + * + * @param in the inputstream to read from + * @param numBits the number of bits of data to consume from the inputstream, -1 if unknown + * @return the reader instance + * @throws IOException + */ + public InputReader createInputReader(InputStream in, long numBits) + throws IOException; +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionCodec.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionCodec.java new file mode 100644 index 0000000..91836c3 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionCodec.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; + +public class UberCompressionCodec implements CompressionCodec, Configurable { + + @Override + public Compressor createCompressor() { + return new DummyCompressor(); + } + + private static class DummyCompressor implements Compressor { + + @Override + public void setInput(byte[] b, int off, int len) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean needsInput() { + throw new UnsupportedOperationException(); + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException(); + } + + @Override + public long getBytesRead() { + throw new UnsupportedOperationException(); + } + + @Override + public long getBytesWritten() { + throw new UnsupportedOperationException(); + } + + @Override + public void finish() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean finished() { + throw new UnsupportedOperationException(); + } + + @Override + public int compress(byte[] b, int off, int len) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + } + + @Override + public void end() { + throw new UnsupportedOperationException(); + } + } + + @Override + public Decompressor createDecompressor() { + return new DummyDecompressor(); + } + + private static class DummyDecompressor implements Decompressor { + + @Override + public void setInput(byte[] b, int off, int len) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean needsInput() { + throw new UnsupportedOperationException(); + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean needsDictionary() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean finished() { + throw new UnsupportedOperationException(); + } + + @Override + public int decompress(byte[] b, int off, int len) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + } + + @Override + public void end() { + throw new UnsupportedOperationException(); + } + + } + + @Override + public Class getCompressorType() { + return DummyCompressor.class; + } + + @Override + public Class getDecompressorType() { + return DummyDecompressor.class; + } + + @Override + public CompressionInputStream createInputStream(InputStream arg0) throws IOException { + return new UberCompressionInputStream(conf, arg0); + } + + @Override + public CompressionInputStream createInputStream(InputStream arg0, Decompressor arg1) + throws IOException { + return new UberCompressionInputStream(conf, arg0); + } + + @Override + public CompressionOutputStream createOutputStream(OutputStream arg0) throws IOException { + return new UberCompressionOutputStream(conf, arg0); + } + + @Override + public CompressionOutputStream createOutputStream(OutputStream arg0, Compressor arg1) + throws IOException { + return new UberCompressionOutputStream(conf, arg0); + } + + @Override + public String getDefaultExtension() { + return ".uz"; + } + + private Configuration conf; + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration arg0) { + conf = arg0; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionInputStream.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionInputStream.java new file mode 100644 index 0000000..29f345e --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionInputStream.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.contrib.ubercompressor.dsalg.Tuple; +import org.apache.hadoop.hive.ql.io.SchemaAwareCompressionInputStream; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; + +public class UberCompressionInputStream extends SchemaAwareCompressionInputStream { + + private static final Log LOG = LogFactory.getLog(UberCompressionInputStream.class); + + private int mColumnNumber = -1; + + ByteStream.Output byteOut; + private byte[] buf; + + private Configuration conf; + + public UberCompressionInputStream(Configuration conf, InputStream arg0) throws IOException { + super(arg0); + this.conf = conf; + } + + @Override + public void setColumnIndex(int columnIndex) { + mColumnNumber = columnIndex; + } + + @Override + public void resetState() throws IOException { + } + + @Override + public int read(byte[] data, int offset, int length) throws IOException { + LOG.info("Reading " + length + " bytes for colindex " + mColumnNumber); + try { + if (mColumnNumber == -1) { + UberCompressorColumnConfig defaultConfig = new UberCompressorColumnConfig(conf); + CompressionCodec codec = defaultConfig.getCodec(); + CompressionInputStream cin = codec.createInputStream(in); + cin.resetState(); + readFully(data, offset, length, cin); + return length; + } + DataInput dataIn = new DataInputStream(in); + String tis = dataIn.readUTF(); + String colcon = dataIn.readUTF(); + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(tis); + UberCompressorColumnConfig columnConfig = new UberCompressorColumnConfig(mColumnNumber, + typeInfo, colcon, conf); + if (byteOut == null) { + byteOut = new ByteStream.Output(); + } else { + byteOut.reset(); + } + UberCompressorSerdeField field = new UberCompressorSerdeField(typeInfo, true); + TypeSpecificCompressor compressor = columnConfig.getCompressor(); + if (compressor != null) { + ObjectInspector foi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo); + readFromCompressor(compressor, foi, field, length); + // FIXME: need an outputstream on existing buffer + System.arraycopy(byteOut.getData(), 0, data, 0, length); + return length; + } else { + SerDe serde = columnConfig.getSerde(); + if (serde != null) { + int serializedLength = dataIn.readInt(); + enlargeBuffer(serializedLength); + CompressionCodec codec = columnConfig.getCodec(); + CompressionInputStream cin = codec.createInputStream(in); + readFully(buf, 0, serializedLength, cin); + DataInput din = new DataInputStream(new ByteArrayInputStream(buf, 0, serializedLength)); + Writable w = serde.getSerializedClass().newInstance(); + w.readFields(din); + Object o = serde.deserialize(w); + StructObjectInspector soi = (StructObjectInspector) serde.getObjectInspector(); + Object list = soi.getStructFieldsDataAsList(o).get(0); + ListObjectInspector loi = (ListObjectInspector) soi.getAllStructFieldRefs().get(0) + .getFieldObjectInspector(); + ObjectInspector oi = loi.getListElementObjectInspector(); + for (Object element : loi.getList(list)) { + field.writeObject(element, oi, byteOut); + } + System.arraycopy(byteOut.getData(), 0, data, 0, length); + return length; + } else { + CompressionCodec codec = columnConfig.getCodec(); + CompressionInputStream cin = codec.createInputStream(in); + cin.resetState(); + readFully(data, offset, length, cin); + return length; + } + } + } catch (SerDeException e) { + throw new IOException(e); + } catch (InstantiationException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + } + + private void readFully(byte[] data, int offset, int length, CompressionInputStream cin) + throws IOException, EOFException { + int read = 0; + while (read < length) { + int read_now = cin.read(data, offset + read, length - read); + if (read_now < 0) { + throw new EOFException("Trying to read " + length + + " bytes from the compressed stream, got only " + read + " bytes."); + } else { + read += read_now; + } + } + } + + private void readFromCompressor(TypeSpecificCompressor compressor, ObjectInspector foi, + UberCompressorSerdeField field, int length) throws IOException { + InputReader ir = compressor.createInputReader(in, -1); + while (byteOut.getCount() < length) { + Tuple i = ir.read(); + if (i.getFirst()) { + throw new IOException("EOF reached before we could read " + length + " bytes"); + } + field.writeObject(i.getSecond(), foi, byteOut); + } + ir.close(false); + } + + private void enlargeBuffer(int length) { + if ((buf == null) || buf.length < length) { + buf = new byte[length]; + } + } + + @Override + public int read() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public long skip(long n) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int available() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized void mark(int readlimit) { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized void reset() throws IOException { + } + + @Override + public boolean markSupported() { + return false; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionOutputStream.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionOutputStream.java new file mode 100644 index 0000000..c4e1502 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressionOutputStream.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.contrib.ubercompressor.dsalg.Tuple; +import org.apache.hadoop.hive.ql.io.SchemaAwareCompressionOutputStream; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionOutputStream; + +public class UberCompressionOutputStream extends SchemaAwareCompressionOutputStream { + + @SuppressWarnings("unused") + private static final Log LOG = LogFactory.getLog(UberCompressionOutputStream.class); + + private static final String COMPRESSION_CONF = "hive.ubercompressor.config.column."; + + private final ByteStream.Output byteOut; + private final Configuration conf; + + private int mColumnNumber = -1; + private boolean resetPending = true; + private UberCompressorColumnConfig columnConfig; + + private CompressionOutputStream mCos; // used for -1 mColumn + + public UberCompressionOutputStream(Configuration conf, OutputStream arg0) throws IOException { + super(arg0); + byteOut = new ByteStream.Output(); + this.conf = conf; + } + + @Override + public void finish() throws IOException { + if (mColumnNumber != -1) { + doWrite(); + } else { + if (mCos != null) { + mCos.finish(); + } + } + } + + @Override + public void resetState() throws IOException { + resetPending = true; + } + + @Override + public void setColumnIndex(int columnIndex) { + if (mColumnNumber != columnIndex) { + mCos = null; + resetPending = true; + mColumnNumber = columnIndex; + columnConfig = null; + } + } + + private void initializeColumnConfig() throws SerDeException { + UberCompressorConfig uberCompressorConfig = UberCompressorConfig.getInstance(); + if (mColumnNumber != -1) { + TypeInfo ti = uberCompressorConfig.getColumnType(mColumnNumber); + String colConfig = conf.get(COMPRESSION_CONF + mColumnNumber); + if (colConfig != null) { + columnConfig = new UberCompressorColumnConfig( + uberCompressorConfig.getColumnName(mColumnNumber), + ti, colConfig, conf); + } else { + columnConfig = new UberCompressorColumnConfig( + uberCompressorConfig.getColumnName(mColumnNumber), ti, conf); + } + } else { + columnConfig = new UberCompressorColumnConfig(conf); + } + } + + + private void doReset() throws IOException { + try { + if (columnConfig == null) { + initializeColumnConfig(); + } + } catch (SerDeException e) { + throw new IOException(e); + } + if (mColumnNumber == -1) { + if (mCos != null) { + mCos.resetState(); + } else { + CompressionCodec codec = columnConfig.getCodec(); + mCos = codec.createOutputStream(out); + } + } else { + byteOut.reset(); + } + resetPending = false; + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + if (resetPending) { + doReset(); + } + if (mColumnNumber == -1) { + mCos.write(data, offset, length); + return; + } else { + byteOut.write(data, offset, length); + } + } + + private void doWrite() throws IOException { + try { + byte[] data = byteOut.getData(); + int length = byteOut.getCount(); + if (length == 0) { + return; // nothing to do + } else { + TypeInfo typeInfo = columnConfig.getColumnType(); + String tis = typeInfo.getTypeName(); + DataOutputStream dataOutput = new DataOutputStream(out); + dataOutput.writeUTF(tis); + String colcon = columnConfig.getColumnConfig(); + dataOutput.writeUTF(colcon); + dataOutput.flush(); + TypeSpecificCompressor compressor = columnConfig.getCompressor(); + ObjectInspector foi = UberCompressorSerdeField.getObjectInspector(typeInfo); + UberCompressorSerdeField field = new UberCompressorSerdeField(typeInfo, false); + if (compressor != null) { + OutputWriter ow = compressor.createOutputWriter(out, foi); + int start = 0; + Tuple, Integer> t = field.readObject(data, start, -1); + while (t != null) { + ow.write(t.getFirst()); + start += t.getSecond(); + t = field.readObject(data, start, -1); + } + ow.close(false); + } else { + SerDe serde = columnConfig.getSerde(); + if (serde != null) { + ArrayList list = new ArrayList(); + int start = 0; + Tuple, Integer> t = field.readObject(data, start, -1); + while (t != null) { + LazyObject lo = t.getFirst(); + Object stdO = ObjectInspectorUtils.copyToStandardObject(lo, + foi, + ObjectInspectorCopyOption.JAVA); + list.add(stdO); + start += t.getSecond(); + t = field.readObject(data, start, -1); + } + StandardListObjectInspector loi = + ObjectInspectorFactory.getStandardListObjectInspector(foi); + Object o = loi.create(list.size()); + for (int index = 0; index < list.size(); index++) { + loi.set(o, index, list.get(index)); + } + StandardStructObjectInspector soi = columnConfig.getTypeInfoForSerde(); + Object s = soi.create(); + soi.setStructFieldData(s, soi.getAllStructFieldRefs().get(0), o); + Writable w = serde.serialize(s, soi); + byteOut.reset(); + DataOutput serialized = new DataOutputStream(byteOut); + w.write(serialized); + dataOutput.writeInt(byteOut.getCount()); + dataOutput.flush(); + CompressionCodec codec = columnConfig.getCodec(); + CompressionOutputStream cos = codec.createOutputStream(out); + cos.write(byteOut.getData(), 0, byteOut.getCount()); + cos.finish(); + } else { + CompressionCodec codec = columnConfig.getCodec(); + CompressionOutputStream cos = codec.createOutputStream(out); + cos.write(data, 0, length); + cos.finish(); + } + byteOut.reset(); + } + } + } catch (SerDeException e) { + throw new IOException(e); + } + } + + @Override + public void write(int b) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorColumnConfig.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorColumnConfig.java new file mode 100644 index 0000000..62a30cd --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorColumnConfig.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.util.ReflectionUtils; + +public class UberCompressorColumnConfig { + + private static final Log LOG = LogFactory.getLog(UberCompressorColumnConfig.class); + + private String name; + private String columnConfig; + private TypeInfo typeInfo; + + private TypeSpecificCompressor compressor; + private CompressionCodec codec; + private SerDe serde; + private StandardStructObjectInspector structOfListOfItems; + + public UberCompressorColumnConfig(Configuration co) throws SerDeException { + this(-1, null, co); + } + + public UberCompressorColumnConfig(int columnNo, TypeInfo ti, Configuration co) + throws SerDeException { + this("col#" + columnNo, ti, "", co); + } + + public UberCompressorColumnConfig(String name, TypeInfo ti, Configuration co) + throws SerDeException { + this(name, ti, "", co); + } + + UberCompressorColumnConfig(int columnNo, TypeInfo typeInfo, String columnConfig, Configuration co) + throws SerDeException { + this("col#" + columnNo, typeInfo, columnConfig, co); + } + + public UberCompressorColumnConfig(String name, TypeInfo ti, String columnConfig, Configuration co) + throws SerDeException { + this.name = name; + this.columnConfig = columnConfig; + this.typeInfo = ti; + initializeColumnConfig(co); + LOG.info("Column:" + this.name + " initialized as per config:" + this.columnConfig); + } + + private void initializeColumnConfig(Configuration co) + throws SerDeException { + try { + if (columnConfig.startsWith("compressor:")) { + String compressorName = columnConfig.substring(11); + compressor = (TypeSpecificCompressor) Class.forName(compressorName).newInstance(); + } else { + if (columnConfig.startsWith("codec:")) { + String codecName = columnConfig.substring(6); + int ci = codecName.indexOf(','); + if (ci != -1) { + String serdeName = codecName.substring(ci + 1); + serde = (SerDe) Class.forName(serdeName).newInstance(); + Properties tbl = new Properties(); + ObjectInspector listElementObjectInspector = + TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo); + ObjectInspector sloi = + ObjectInspectorFactory.getStandardListObjectInspector(listElementObjectInspector); + List names = Arrays.asList("listfield"); + structOfListOfItems = + ObjectInspectorFactory.getStandardStructObjectInspector(names, Arrays.asList(sloi)); + TypeInfo list = TypeInfoFactory.getListTypeInfo(typeInfo); + tbl.setProperty(Constants.LIST_COLUMNS, "listOf" + name); + tbl.setProperty(Constants.LIST_COLUMN_TYPES, list.getTypeName()); + tbl.put(Constants.ESCAPE_CHAR, "\n"); // For lazysimpleserde + serde.initialize(null, tbl); + codecName = codecName.substring(0, ci); + } + codec = (CompressionCodec) Class.forName(codecName).newInstance(); + ReflectionUtils.setConf(codec, co); + } else { + if (columnConfig.isEmpty()) { + columnConfig = "codec:org.apache.hadoop.io.compress.BZip2Codec"; + codec = new BZip2Codec(); + ReflectionUtils.setConf(codec, co); + } else { + throw new SerDeException("Unsupported type for compressor: " + columnConfig); + } + } + } + } catch (ClassNotFoundException e) { + throw new SerDeException(e); + } catch (IllegalAccessException e) { + throw new SerDeException(e); + } catch (InstantiationException e) { + throw new SerDeException(e); + } + } + + public TypeInfo getColumnType() { + return typeInfo; + } + + public String getColumnConfig() { + return columnConfig; + } + + @Override + public String toString() { + return "UberCompressorColumnConfig [name=" + name + + ", columnConfig=" + columnConfig + + ", typeInfo=" + typeInfo + + "]"; + } + + public TypeSpecificCompressor getCompressor() { + return compressor; + } + + public CompressionCodec getCodec() { + return codec; + } + + public SerDe getSerde() { + return serde; + } + + public StandardStructObjectInspector getTypeInfoForSerde() { + return structOfListOfItems; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorConfig.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorConfig.java new file mode 100644 index 0000000..d40b2f4 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorConfig.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +public class UberCompressorConfig { + + private static final Log LOG = LogFactory.getLog(UberCompressorConfig.class); + + private StandardStructObjectInspector objectInspector; + private SerDeParameters serdeParams; + private int size; + + UberCompressorConfig(Properties tbl) throws SerDeException { + serdeParams = new SerDeParameters(); + LazyUtils.extractColumnInfo(tbl, serdeParams, "UberCompressor"); + + List columnTypes = serdeParams.getColumnTypes(); + size = columnTypes.size(); + LOG.info("Initializing UberCompressor for schema : " + serdeParams.getColumnNames() + "/" + + columnTypes); + List fois = new ArrayList(size); + for (TypeInfo ti : columnTypes) { + ObjectInspector oi = UberCompressorSerdeField.getObjectInspector(ti); + fois.add(oi); + } + objectInspector = + ObjectInspectorFactory.getStandardStructObjectInspector( + serdeParams.getColumnNames(), fois); + } + + public int getNumberOfColumns() { + return size; + } + + public String getColumnName(int col) { + return serdeParams.getColumnNames().get(col); + } + + public TypeInfo getColumnType(int col) { + return serdeParams.getColumnTypes().get(col); + } + + public StandardStructObjectInspector getObjectInspector() { + return objectInspector; + } + + private static UberCompressorConfig theInstance; + + public static UberCompressorConfig getInstance() { + return theInstance; + } + + public static void setInstance(UberCompressorConfig uberCompressorConfig) { + theInstance = uberCompressorConfig; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorSerde.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorSerde.java new file mode 100644 index 0000000..73f9a9f --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorSerde.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; +import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.Writable; + +/** + * UberCompressorSerde. This serde works in conjunction with UberCompressor to pass on schema + * information. It is very similar to columnarserde in operation, using lazysimpleserde methods + * to serialize and producing a ByteRefArrayWritable. + * + */ +public class UberCompressorSerde implements SerDe { + + @SuppressWarnings("unused") + private static final Log LOG = LogFactory.getLog(UberCompressorSerde.class); + + // serialization/deserialization related + private UberCompressorConfig uberCompressorConfig; + private UberCompressorSerdeField[] fields; + private long rawDataSize; + private SerDeStats serdeStats = new SerDeStats(); + + // serialization related + private BytesRefArrayWritable serializeCache; + private ByteStream.Output byteStream = new ByteStream.Output(); + + + @Override + public void initialize(Configuration _unused, Properties tbl) throws SerDeException { + uberCompressorConfig = new UberCompressorConfig(tbl); + } + + private void initForSerialization() { + if (serializeCache == null) { + UberCompressorConfig.setInstance(uberCompressorConfig); + int size = uberCompressorConfig.getNumberOfColumns(); + serializeCache = new BytesRefArrayWritable(size); + BytesRefWritable[] field = new BytesRefWritable[size]; + fields = new UberCompressorSerdeField[size]; + for (int i = 0; i < size; i++) { + field[i] = new BytesRefWritable(); + serializeCache.set(i, field[i]); + fields[i] = new UberCompressorSerdeField(uberCompressorConfig.getColumnType(i), true); + } + } + } + + private void initForDeserialization() { + if (fields == null) { + int size = uberCompressorConfig.getNumberOfColumns(); + fields = new UberCompressorSerdeField[size]; + for (int i = 0; i < size; i++) { + fields[i] = new UberCompressorSerdeField(uberCompressorConfig.getColumnType(i), false); + } + } + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + initForSerialization(); + if (objInspector.getCategory() != Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only serialize struct types, but we got: " + objInspector.getTypeName()); + } + + StructObjectInspector soi = (StructObjectInspector) objInspector; + List sfields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + + byteStream.reset(); + int streamOffset = 0; + // Serialize each field + for (int i = 0; i < sfields.size(); i++) { + ObjectInspector foi = sfields.get(i).getFieldObjectInspector(); + Object f = (list == null ? null : list.get(i)); + try { + if (f != null) { + fields[i].writeObject(f, foi, byteStream); + byteStream.flush(); + serializeCache.get(i).set(byteStream.getData(), + streamOffset, + byteStream.getCount() - streamOffset); + streamOffset = byteStream.getCount(); + } else { + serializeCache.get(i).set(byteStream.getData(), streamOffset, 0); + } + } catch (IOException e) { + throw new SerDeException(e); + } + } + rawDataSize = byteStream.getCount(); + return serializeCache; + } + + @Override + public Object deserialize(Writable blob) throws SerDeException { + initForDeserialization(); + if (!(blob instanceof BytesRefArrayWritable)) { + throw new SerDeException(getClass().toString() + ": expects BytesRefArrayWritable!"); + } + try { + BytesRefArrayWritable cols = (BytesRefArrayWritable) blob; + StandardStructObjectInspector soi = (StandardStructObjectInspector) getObjectInspector(); + List sfields = soi.getAllStructFieldRefs(); + Object struct = soi.create(); + rawDataSize = 0; + for (int index = 0; index < sfields.size(); ++index) { + StructField sf = sfields.get(index); + BytesRefWritable brw = cols.get(index); + int length = brw.getLength(); + rawDataSize += length; + if (length == 0) { + soi.setStructFieldData(struct, sf, null); + } else { + Object rc = + fields[index].readObject(brw.getData(), brw.getStart(), length).getFirst(); + soi.setStructFieldData(struct, sf, rc); + } + } + return struct; + } catch (IOException e) { + throw new SerDeException(e); + } + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return uberCompressorConfig.getObjectInspector(); + } + + @Override + public SerDeStats getSerDeStats() { + serdeStats.setRawDataSize(rawDataSize); + return serdeStats; + } + + @Override + public Class getSerializedClass() { + return BytesRefArrayWritable.class; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorSerdeField.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorSerdeField.java new file mode 100644 index 0000000..7d2642e --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorSerdeField.java @@ -0,0 +1,95 @@ +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Properties; + +import org.apache.hadoop.hive.contrib.ubercompressor.dsalg.Tuple; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +class UberCompressorSerdeField { + private static SerDeParameters serdeParams; + static { + Properties props = new Properties(); + props.put(Constants.ESCAPE_CHAR, "\n"); + try { + serdeParams = LazySimpleSerDe.initSerdeParams(null, props, "uber"); + } catch (SerDeException e) { + throw new RuntimeException(e); + } + } + // for reading only + private Tuple, Integer> readRV; + private ByteArrayRef bar; + + UberCompressorSerdeField(TypeInfo typeInfo, boolean onlyWriting) { + if (!onlyWriting) { + ObjectInspector oi = getObjectInspector(typeInfo); + LazyObject lo = LazyFactory.createLazyObject(oi); + readRV = new Tuple, Integer>(lo, 0); + bar = new ByteArrayRef(); + } + } + + private static HashMap cachedOIs = + new HashMap(); + + static ObjectInspector getObjectInspector(TypeInfo ti) { + ObjectInspector oi = cachedOIs.get(ti); + if (oi == null) { + oi = createObjectInspector(ti); + cachedOIs.put(ti, oi); + } + return oi; + } + + private static ObjectInspector createObjectInspector(TypeInfo ti) { + return LazyFactory.createLazyObjectInspector(ti, + serdeParams.getSeparators(), 1, serdeParams.getNullSequence(), serdeParams.isEscaped(), + serdeParams.getEscapeChar()); + } + + public void writeObject(Object f, ObjectInspector foi, Output byteStream) throws IOException { + if (f == null) { // nulls are handled via a 0-length value + return; + } + LazySimpleSerDe.serialize(byteStream, f, foi, serdeParams.getSeparators(), 1, + serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar(), + serdeParams.getNeedsEscape()); + byteStream.write(serdeParams.getSeparators()[0]); + } + + public Tuple, Integer> readObject(byte[] data, int start, int length) { + byte sep = serdeParams.getSeparators()[0]; + if (length == -1) { + int datalength = data.length; + for (int index = start; index < datalength; ++index) { + if (data[index] == sep) { + length = index - start; + break; + } + } + if (length == -1) { + return null; + } + } else { + length -= 1; // seperator not part of object data + } + bar.setData(data); + LazyObject lo = readRV.getFirst(); + lo.init(bar, start, length); + readRV.setFirst(lo); + readRV.setSecond(length + 1); // consume seperator also + return readRV; + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorUtils.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorUtils.java new file mode 100644 index 0000000..5e0aa67 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/UberCompressorUtils.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + + +public class UberCompressorUtils { + + @SuppressWarnings("unused") + private static final Log LOG = LogFactory.getLog(UberCompressorUtils.class); + + private UberCompressorUtils() { + } + + public static void writeInt(OutputStream byteStream, int v) throws IOException { + byteStream.write((v >>> 24) & 0xFF); + byteStream.write((v >>> 16) & 0xFF); + byteStream.write((v >>> 8) & 0xFF); + byteStream.write((v >>> 0) & 0xFF); + } + + public static int readInt(InputStream byteStream) throws IOException { + int rc = 0; + for (int j = 0; j < 4; ++j) { + int b = byteStream.read(); + if (b == -1) { + throw new EOFException(); + } + rc = (rc << 8) | (b & 0xFF); + } + return rc; + } +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/compressors/DummyIntegerCompressor.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/compressors/DummyIntegerCompressor.java new file mode 100644 index 0000000..a6448cd --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/compressors/DummyIntegerCompressor.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.ubercompressor.compressors; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hive.contrib.ubercompressor.InputReader; +import org.apache.hadoop.hive.contrib.ubercompressor.OutputWriter; +import org.apache.hadoop.hive.contrib.ubercompressor.TypeSpecificCompressor; +import org.apache.hadoop.hive.contrib.ubercompressor.UberCompressorUtils; +import org.apache.hadoop.hive.contrib.ubercompressor.dsalg.Tuple; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; + +public class DummyIntegerCompressor implements TypeSpecificCompressor { + + private static class MyOutputWriter implements OutputWriter { + private OutputStream mOut; + private IntObjectInspector mOI; + + MyOutputWriter(OutputStream out, ObjectInspector oi) { + mOut = out; + Category c = oi.getCategory(); + switch (c) { + case PRIMITIVE: + PrimitiveCategory pc = ((PrimitiveObjectInspector) oi).getPrimitiveCategory(); + switch (pc) { + case INT: + mOI = (IntObjectInspector) oi; + break; + default: + throw new IllegalArgumentException( + "DummyIntegerCompressor passed invalid objectInspector of type:" + oi.getTypeName()); + } + break; + default: + throw new IllegalArgumentException( + "DummyIntegerCompressor passed invalid objectInspector of type:" + oi.getTypeName()); + } + } + + @Override + public void write(Object t) throws IOException { + Integer i = (Integer) mOI.getPrimitiveJavaObject(t); + UberCompressorUtils.writeInt(mOut, i); + } + + @Override + public long close(boolean closeUnderlying) throws IOException { + if (closeUnderlying) { + mOut.close(); + } else { + mOut.flush(); + } + return -1; + } + } + + private static class MyInputReader implements InputReader { + + private InputStream mIn; + private AbstractPrimitiveJavaObjectInspector mOI; + + public MyInputReader(InputStream in) { + mIn = in; + mOI = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveCategory.INT); + } + + @Override + public Tuple read() throws IOException { + try { + return new Tuple(false, UberCompressorUtils.readInt(mIn)); + } catch (EOFException e) { + return new Tuple(true, 0); + } + } + + @Override + public void close(boolean closeUnderLying) throws IOException { + if (closeUnderLying) { + mIn.close(); + } + } + + @Override + public ObjectInspector getObjectInspector() { + return mOI; + } + + } + + @Override + public OutputWriter createOutputWriter(OutputStream out, ObjectInspector oi) throws IOException { + return new MyOutputWriter(out, oi); + } + + @Override + public InputReader createInputReader(InputStream in, long numBits) throws IOException { + return new MyInputReader(in); + } + +} diff --git contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/dsalg/Tuple.java contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/dsalg/Tuple.java new file mode 100644 index 0000000..29d1e8b --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/ubercompressor/dsalg/Tuple.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.ubercompressor.dsalg; + +/** + * + * Tuple. A simple pair class + * + * @param + * @param + */ +public class Tuple { + private T1 mT1; + private T2 mT2; + + public Tuple(T1 t1, T2 t2) { + mT1 = t1; + mT2 = t2; + } + + public Tuple() { + } + + public T1 getFirst() { + return mT1; + } + + public T2 getSecond() { + return mT2; + } + + public void setFirst(T1 t1) { + mT1 = t1; + } + + public void setSecond(T2 t2) { + mT2 = t2; + } +} diff --git contrib/src/test/queries/clientpositive/ubercompressor.q contrib/src/test/queries/clientpositive/ubercompressor.q new file mode 100644 index 0000000..80f636f --- /dev/null +++ contrib/src/test/queries/clientpositive/ubercompressor.q @@ -0,0 +1,19 @@ +ADD JAR ${system:build.dir}/hive-contrib-${system:hive.version}.jar; + +set hive.exec.compress.output=true; +set mapred.output.compression.codec=org.apache.hadoop.hive.contrib.ubercompressor.UberCompressionCodec; +set hive.ubercompressor.config.column.1=codec:org.apache.hadoop.io.compress.GzipCodec; +set hive.ubercompressor.config.column.2=codec:org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; +set hive.ubercompressor.config.column.3=compressor:org.apache.hadoop.hive.contrib.ubercompressor.compressors.DummyIntegerCompressor; +set hive.ubercompressor.config.column.4=codec:org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; + +CREATE TABLE table_ubercompressor(a array, b array, c map, d int, e string) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.contrib.ubercompressor.UberCompressorSerde' +STORED AS RCFILE; + +FROM src_thrift +INSERT OVERWRITE TABLE table_ubercompressor SELECT src_thrift.lint, src_thrift.lstring, src_thrift.mstringstring, src_thrift.aint, src_thrift.astring; + +SELECT * FROM table_ubercompressor; + diff --git contrib/src/test/results/clientpositive/ubercompressor.q.out contrib/src/test/results/clientpositive/ubercompressor.q.out new file mode 100644 index 0000000..97272d0 --- /dev/null +++ contrib/src/test/results/clientpositive/ubercompressor.q.out @@ -0,0 +1,50 @@ +PREHOOK: query: CREATE TABLE table_ubercompressor(a array, b array, c map, d int, e string) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.contrib.ubercompressor.UberCompressorSerde' +STORED AS RCFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE table_ubercompressor(a array, b array, c map, d int, e string) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.contrib.ubercompressor.UberCompressorSerde' +STORED AS RCFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_ubercompressor +PREHOOK: query: FROM src_thrift +INSERT OVERWRITE TABLE table_ubercompressor SELECT src_thrift.lint, src_thrift.lstring, src_thrift.mstringstring, src_thrift.aint, src_thrift.astring +PREHOOK: type: QUERY +PREHOOK: Input: default@src_thrift +PREHOOK: Output: default@table_ubercompressor +POSTHOOK: query: FROM src_thrift +INSERT OVERWRITE TABLE table_ubercompressor SELECT src_thrift.lint, src_thrift.lstring, src_thrift.mstringstring, src_thrift.aint, src_thrift.astring +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src_thrift +POSTHOOK: Output: default@table_ubercompressor +POSTHOOK: Lineage: table_ubercompressor.a SIMPLE [(src_thrift)src_thrift.FieldSchema(name:lint, type:array, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.b SIMPLE [(src_thrift)src_thrift.FieldSchema(name:lstring, type:array, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.c SIMPLE [(src_thrift)src_thrift.FieldSchema(name:mstringstring, type:map, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.d SIMPLE [(src_thrift)src_thrift.FieldSchema(name:aint, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.e SIMPLE [(src_thrift)src_thrift.FieldSchema(name:astring, type:string, comment:from deserializer), ] +PREHOOK: query: SELECT * FROM table_ubercompressor +PREHOOK: type: QUERY +PREHOOK: Input: default@table_ubercompressor +PREHOOK: Output: file:/var/folders/67/67R3POPtF90VG63KSmCbcU++F0U/-Tmp-/krishnak/hive_2011-12-17_02-25-30_447_7253195807291359174/-mr-10000 +POSTHOOK: query: SELECT * FROM table_ubercompressor +POSTHOOK: type: QUERY +POSTHOOK: Input: default@table_ubercompressor +POSTHOOK: Output: file:/var/folders/67/67R3POPtF90VG63KSmCbcU++F0U/-Tmp-/krishnak/hive_2011-12-17_02-25-30_447_7253195807291359174/-mr-10000 +POSTHOOK: Lineage: table_ubercompressor.a SIMPLE [(src_thrift)src_thrift.FieldSchema(name:lint, type:array, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.b SIMPLE [(src_thrift)src_thrift.FieldSchema(name:lstring, type:array, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.c SIMPLE [(src_thrift)src_thrift.FieldSchema(name:mstringstring, type:map, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.d SIMPLE [(src_thrift)src_thrift.FieldSchema(name:aint, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: table_ubercompressor.e SIMPLE [(src_thrift)src_thrift.FieldSchema(name:astring, type:string, comment:from deserializer), ] +[0,0,0] ["0","0","0"] {"key_0":"value_0"} 1712634731 record_0 +[1,2,3] ["10","100","1000"] {"key_1":"value_1"} 465985200 record_1 +[2,4,6] ["20","200","2000"] {"key_2":"value_2"} -751827638 record_2 +[3,6,9] ["30","300","3000"] {"key_3":"value_3"} 477111222 record_3 +[4,8,12] ["40","400","4000"] {"key_4":"value_4"} -734328909 record_4 +[5,10,15] ["50","500","5000"] {"key_5":"value_5"} -1952710710 record_5 +[6,12,18] ["60","600","6000"] {"key_6":"value_6"} 1244525190 record_6 +[7,14,21] ["70","700","7000"] {"key_7":"value_7"} -1461153973 record_7 +[8,16,24] ["80","800","8000"] {"key_8":"value_8"} 1638581578 record_8 +[9,18,27] ["90","900","9000"] {"key_9":"value_9"} 336964413 record_9 +null null null 0 NULL