diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java new file mode 100644 index 0000000..0f3e636 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.Writable; + +public interface VectorizedSerde { + + Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector) + throws SerDeException; + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java new file mode 100644 index 0000000..d8764c0 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.InputFormatChecker; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + + +public class CommonOrcInputFormat extends FileInputFormat + implements InputFormatChecker { + + OrcInputFormat oif = new OrcInputFormat(); + VectorizedOrcInputFormat voif = new VectorizedOrcInputFormat(); + + private static class CommonOrcRecordReader + implements RecordReader { + + final RecordReader vorr; + final RecordReader orr; + + public CommonOrcRecordReader(RecordReader vorr, + RecordReader orr) { + this.vorr = vorr; + this.orr = orr; + } + + @Override + public void close() throws IOException { + if (vorr != null) { + vorr.close(); + } else { + orr.close(); + } + + } + + @Override + public NullWritable createKey() { + if (vorr != null) { + return vorr.createKey(); + } else { + return orr.createKey(); + } + } + + @Override + public Writable createValue() { + if (vorr != null) { + return vorr.createValue(); + } else { + return orr.createValue(); + } + } + + @Override + public long getPos() throws IOException { + if (vorr != null) { + return vorr.getPos(); + } else { + return orr.getPos(); + } + } + + @Override + public float getProgress() throws IOException { + if (vorr != null) { + return vorr.getProgress(); + } else { + return orr.getProgress(); + } + } + + @Override + public boolean next(NullWritable arg0, Writable arg1) throws IOException { + if (vorr != null) { + return vorr.next(arg0, (VectorizedRowBatch) arg1); + } else { + return orr.next(arg0, (OrcStruct) arg1); + } + } + + } + + @Override + public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList files) + throws IOException { + boolean vectorPath = conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(), + false); + if (vectorPath) { + return voif.validateInput(fs, conf, files); + } else { + return oif.validateInput(fs, conf, files); + } + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf conf, + Reporter reporter) throws IOException { + boolean vectorPath = conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(), + false); + if (vectorPath) { + RecordReader vorr = voif.getRecordReader(split, conf, + reporter); + return new CommonOrcRecordReader(vorr, null); + } else { + RecordReader orr = oif.getRecordReader(split, conf, reporter); + return new CommonOrcRecordReader(null, orr); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java index 32da4cd..579d8a7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java @@ -17,7 +17,15 @@ */ package org.apache.hadoop.hive.ql.io.orc; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Properties; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedSerde; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -27,23 +35,19 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.Writable; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Properties; - /** * A serde class for ORC. * It transparently passes the object to/from the ORC file reader/writer. */ -public class OrcSerde implements SerDe { +public class OrcSerde implements SerDe, VectorizedSerde { private final OrcSerdeRow row = new OrcSerdeRow(); private ObjectInspector inspector = null; + private VectorizedOrcSerde vos = null; + final class OrcSerdeRow implements Writable { - private Object realRow; - private ObjectInspector inspector; + Object realRow; + ObjectInspector inspector; @Override public void write(DataOutput dataOutput) throws IOException { @@ -129,4 +133,13 @@ public ObjectInspector getObjectInspector() throws SerDeException { public SerDeStats getSerDeStats() { return null; } + + @Override + public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector) + throws SerDeException { + if (vos == null) { + vos = new VectorizedOrcSerde(); + } + return vos.serialize(vrg, objInspector); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java new file mode 100644 index 0000000..c8fd293 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.InputFormatChecker; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * A MapReduce/Hive input format for ORC files. + */ +public class VectorizedOrcInputFormat extends FileInputFormat + implements InputFormatChecker { + + private static class VectorizedOrcRecordReader + implements RecordReader { + private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; + private final long offset; + private final long length; + private final int numColumns; + private float progress = 0.0f; + private final OrcStruct rowObj; + private final List types; + + VectorizedOrcRecordReader(Reader file, Configuration conf, + long offset, long length) throws IOException { + this.reader = file.rows(offset, length, + findIncludedColumns(file.getTypes(), conf)); + types = file.getTypes(); + if (types.size() == 0) { + numColumns = 0; + } else { + numColumns = types.get(0).getSubtypesCount(); + } + this.offset = offset; + this.length = length; + rowObj = new OrcStruct(numColumns); + } + + @Override + public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { + + if (!reader.hasNext()) { + return false; + } + reader.nextBatch(value); + progress = reader.getProgress(); + return true; + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + return new VectorizedRowBatch(numColumns, + VectorizedRowBatch.DEFAULT_SIZE); + } + + @Override + public long getPos() throws IOException { + return offset + (long) (progress * length); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public float getProgress() throws IOException { + return progress; + } + } + + public VectorizedOrcInputFormat() { + // just set a really small lower bound + setMinSplitSize(16 * 1024); + } + + /** + * Recurse down into a type subtree turning on all of the sub-columns. + * @param types the types of the file + * @param result the global view of columns that should be included + * @param typeId the root of tree to enable + */ + private static void includeColumnRecursive(List types, + boolean[] result, + int typeId) { + result[typeId] = true; + OrcProto.Type type = types.get(typeId); + int children = type.getSubtypesCount(); + for(int i=0; i < children; ++i) { + includeColumnRecursive(types, result, type.getSubtypes(i)); + } + } + + /** + * Take the configuration and figure out which columns we need to include. + * @param types the types of the file + * @param conf the configuration + * @return true for each column that should be included + */ + private static boolean[] findIncludedColumns(List types, + Configuration conf) { + String includedStr = + conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR); + if (includedStr == null || includedStr.trim().length() == 0) { + return null; + } else { + int numColumns = types.size(); + boolean[] result = new boolean[numColumns]; + result[0] = true; + OrcProto.Type root = types.get(0); + List included = ColumnProjectionUtils.getReadColumnIDs(conf); + for(int i=0; i < root.getSubtypesCount(); ++i) { + if (included.contains(i)) { + includeColumnRecursive(types, result, root.getSubtypes(i)); + } + } + // if we are filtering at least one column, return the boolean array + for(boolean include: result) { + if (!include) { + return result; + } + } + return null; + } + } + + @Override + public RecordReader + getRecordReader(InputSplit inputSplit, JobConf conf, + Reporter reporter) throws IOException { + FileSplit fileSplit = (FileSplit) inputSplit; + Path path = fileSplit.getPath(); + FileSystem fs = path.getFileSystem(conf); + reporter.setStatus(fileSplit.toString()); + return new VectorizedOrcRecordReader(OrcFile.createReader(fs, path), conf, + fileSplit.getStart(), fileSplit.getLength()); + } + + @Override + public boolean validateInput(FileSystem fs, HiveConf conf, + ArrayList files + ) throws IOException { + if (files.size() <= 0) { + return false; + } + for (FileStatus file : files) { + try { + OrcFile.createReader(fs, file.getPath()); + } catch (IOException e) { + return false; + } + } + return true; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java new file mode 100644 index 0000000..2fcfc95 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; + +/** + * A serde class for ORC. + * It transparently passes the object to/from the ORC file reader/writer. + */ +public class VectorizedOrcSerde extends OrcSerde { + private final OrcStruct [] orcStructArray = new OrcStruct [VectorizedRowBatch.DEFAULT_SIZE]; + private final Writable [] orcRowArray = new Writable [VectorizedRowBatch.DEFAULT_SIZE]; + private final ObjectWritable ow = new ObjectWritable(); + private final ObjectInspector inspector = null; + + public VectorizedOrcSerde() { + super(); + for (int i = 0; i < orcStructArray.length; i++) { + orcRowArray[i] = new OrcSerdeRow(); + } + } + + + @Override + public Writable serialize(Object obj, ObjectInspector inspector) { + VectorizedRowBatch batch = (VectorizedRowBatch)obj; + for (int i = 0; i < batch.size; i++) { + OrcStruct ost = orcStructArray[i]; + if (ost == null) { + ost = new OrcStruct(batch.numCols); + orcStructArray[i] = ost; + } + int index = 0; + if (batch.selectedInUse) { + index = batch.selected[i]; + } else { + index = i; + } + for (int k = 0; k < batch.numCols; k++) { + Writable w = batch.cols[k].getWritableObject(index); + ost.setFieldValue(k, w); + } + OrcSerdeRow row = (OrcSerdeRow) orcRowArray[i]; + row.realRow = ost; + row.inspector = inspector; + } + ow.set(orcRowArray); + return ow; + } +}