commit 9b2f17c8d0f68f09435d7b10aa4c8f102af3483d Author: Owen O'Malley Date: Sun Apr 6 23:45:08 2014 -0700 HIVE-6604. Create an adapter between ACID and vectorization. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java index 710d20c..dda9aae 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java @@ -20,7 +20,6 @@ import java.sql.Timestamp; import java.util.Arrays; -import java.util.Date; import java.util.List; import java.util.Map; @@ -92,7 +91,7 @@ public VectorColumnAssign init(VectorizedRowBatch out, T cv) { } protected void assignNull(int index) { - VectorizedBatchUtil.SetNullColIsNullValue(outCol, index); + VectorizedBatchUtil.setNullColIsNullValue(outCol, index); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 24dc308..8c299a6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.vector; +import java.io.IOException; import java.sql.Timestamp; import java.util.List; @@ -25,6 +26,7 @@ import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -33,6 +35,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; @@ -45,7 +48,7 @@ * @param cv * @param rowIndex */ - public static void SetNullColIsNullValue(ColumnVector cv, int rowIndex) { + public static void setNullColIsNullValue(ColumnVector cv, int rowIndex) { cv.isNull[rowIndex] = true; if (cv.noNulls) { cv.noNulls = false; @@ -56,12 +59,10 @@ public static void SetNullColIsNullValue(ColumnVector cv, int rowIndex) { * Iterates thru all the column vectors and sets noNull to * specified value. * - * @param valueToSet - * noNull value to set * @param batch * Batch on which noNull is set */ - public static void SetNoNullFields(boolean valueToSet, VectorizedRowBatch batch) { + public static void setNoNullFields(VectorizedRowBatch batch) { for (int i = 0; i < batch.numCols; i++) { batch.cols[i].noNulls = true; } @@ -75,8 +76,11 @@ public static void SetNoNullFields(boolean valueToSet, VectorizedRowBatch batch) * @param batch Vectorized batch to which the row is added at rowIndex * @throws HiveException */ - public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIndex, - VectorizedRowBatch batch) throws HiveException { + public static void addRowToBatch(Object row, StructObjectInspector oi, + int rowIndex, + VectorizedRowBatch batch, + DataOutputBuffer buffer + ) throws HiveException { List fieldRefs = oi.getAllStructFieldRefs(); // Iterate thru the cols and load the batch for (int i = 0; i < fieldRefs.size(); i++) { @@ -100,7 +104,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn lcv.isNull[rowIndex] = false; } else { lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); + setNullColIsNullValue(lcv, rowIndex); } } break; @@ -111,7 +115,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn lcv.isNull[rowIndex] = false; } else { lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); + setNullColIsNullValue(lcv, rowIndex); } } break; @@ -122,7 +126,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn lcv.isNull[rowIndex] = false; } else { lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); + setNullColIsNullValue(lcv, rowIndex); } } break; @@ -133,7 +137,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn lcv.isNull[rowIndex] = false; } else { lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); + setNullColIsNullValue(lcv, rowIndex); } } break; @@ -144,7 +148,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn lcv.isNull[rowIndex] = false; } else { lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); + setNullColIsNullValue(lcv, rowIndex); } } break; @@ -155,7 +159,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn lcv.isNull[rowIndex] = false; } else { lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); + setNullColIsNullValue(lcv, rowIndex); } } break; @@ -166,7 +170,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn dcv.isNull[rowIndex] = false; } else { dcv.vector[rowIndex] = Double.NaN; - SetNullColIsNullValue(dcv, rowIndex); + setNullColIsNullValue(dcv, rowIndex); } } break; @@ -177,7 +181,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn dcv.isNull[rowIndex] = false; } else { dcv.vector[rowIndex] = Double.NaN; - SetNullColIsNullValue(dcv, rowIndex); + setNullColIsNullValue(dcv, rowIndex); } } break; @@ -189,7 +193,7 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn lcv.isNull[rowIndex] = false; } else { lcv.vector[rowIndex] = 1; - SetNullColIsNullValue(lcv, rowIndex); + setNullColIsNullValue(lcv, rowIndex); } } break; @@ -198,12 +202,30 @@ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIn if (writableCol != null) { bcv.isNull[rowIndex] = false; Text colText = (Text) writableCol; - bcv.setRef(rowIndex, colText.getBytes(), 0, colText.getLength()); + int start = buffer.getLength(); + int length = colText.getLength(); + try { + buffer.write(colText.getBytes(), 0, length); + } catch (IOException ioe) { + throw new IllegalStateException("bad write", ioe); + } + bcv.setRef(rowIndex, buffer.getData(), start, length); } else { - SetNullColIsNullValue(bcv, rowIndex); + setNullColIsNullValue(bcv, rowIndex); } } break; + case DECIMAL: + DecimalColumnVector dcv = (DecimalColumnVector) batch.cols[i]; + if (writableCol != null) { + dcv.isNull[rowIndex] = false; + HiveDecimalWritable wobj = (HiveDecimalWritable) writableCol; + dcv.vector[rowIndex].update(wobj.getHiveDecimal().unscaledValue(), + (short) wobj.getScale()); + } else { + setNullColIsNullValue(dcv, rowIndex); + } + break; default: throw new HiveException("Vectorizaton is not supported for datatype:" + poi.getPrimitiveCategory()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java index 0fd4983..d4be78d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -60,7 +61,7 @@ public VectorizedColumnarSerDe() throws SerDeException { /** * Serialize a vectorized row batch * - * @param obj + * @param vrg * Vectorized row batch to serialize * @param objInspector * The ObjectInspector for the row object @@ -220,7 +221,7 @@ public Object deserialize(Writable blob) throws SerDeException { // Ideally this should throw UnsupportedOperationException as the serde is // vectorized serde. But since RC file reader does not support vectorized reading this - // is left as it is. This function will be called from VectorizedRowBatchCtx::AddRowToBatch + // is left as it is. This function will be called from VectorizedRowBatchCtx::addRowToBatch // to deserialize the row one by one and populate the batch. Once RC file reader supports vectorized // reading this serde and be standalone serde with no dependency on ColumnarSerDe. return super.deserialize(blob); @@ -251,10 +252,13 @@ public void deserializeVector(Object rowBlob, int rowsInBlob, VectorizedRowBatch reuseBatch) throws SerDeException { BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[]) rowBlob; + DataOutputBuffer buffer = new DataOutputBuffer(); for (int i = 0; i < rowsInBlob; i++) { Object row = deserialize(refArray[i]); try { - VectorizedBatchUtil.AddRowToBatch(row, (StructObjectInspector) cachedObjectInspector, i, reuseBatch); + VectorizedBatchUtil.addRowToBatch(row, + (StructObjectInspector) cachedObjectInspector, i, + reuseBatch, buffer); } catch (HiveException e) { throw new SerDeException(e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index 4d5ed40..49b8da1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; @@ -55,6 +54,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; @@ -332,14 +332,17 @@ public VectorizedRowBatch createVectorizedRowBatch() throws HiveException * Row blob (serialized version of row) * @param batch * Vectorized batch to which the row is added + * @param buffer a buffer to copy strings into * @throws HiveException * @throws SerDeException */ - public void addRowToBatch(int rowIndex, Writable rowBlob, VectorizedRowBatch batch) - throws HiveException, SerDeException + public void addRowToBatch(int rowIndex, Writable rowBlob, + VectorizedRowBatch batch, + DataOutputBuffer buffer + ) throws HiveException, SerDeException { Object row = this.deserializer.deserialize(rowBlob); - VectorizedBatchUtil.AddRowToBatch(row, this.rawRowOI, rowIndex, batch); + VectorizedBatchUtil.addRowToBatch(row, this.rawRowOI, rowIndex, batch, buffer); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index f0c0ecf..d948852 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -389,7 +389,7 @@ protected static PartitionDesc getPartitionDescFromPath( } if (partDesc == null) { throw new IOException("cannot find dir = " + dir.toString() - + " in partToPartitionInfo!"); + + " in " + pathToPartitionInfo); } return partDesc; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java index bc19fb4..597aef6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.io.RCFile.Reader; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; @@ -42,9 +43,6 @@ /** * RCFileRecordReader. - * - * @param - * @param */ public class VectorizedRCFileRecordReader implements RecordReader { @@ -59,6 +57,7 @@ private final LongWritable keyCache = new LongWritable(); private final BytesRefArrayWritable colsCache = new BytesRefArrayWritable(); private boolean addPartitionCols = true; + private final DataOutputBuffer buffer = new DataOutputBuffer(); private static RCFileSyncCache syncCache = new RCFileSyncCache(); @@ -164,7 +163,8 @@ public boolean nextBlock() throws IOException { public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { // Reset column fields noNull values to true - VectorizedBatchUtil.SetNoNullFields(true, value); + VectorizedBatchUtil.setNoNullFields(value); + buffer.reset(); value.selectedInUse = false; for (int i = 0; i < value.numCols; i++) { value.cols[i].isRepeating = false; @@ -187,7 +187,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti in.getCurrentRow(colsCache); // Currently RCFile reader does not support reading vectorized // data. Populating the batch by adding one row at a time. - rbCtx.addRowToBatch(i, (Writable) colsCache, value); + rbCtx.addRowToBatch(i, (Writable) colsCache, value, buffer); } else { break; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 81c630a..189e6cb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -992,24 +992,24 @@ private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, } OrcSplit split = (OrcSplit) inputSplit; - // TODO vectorized reader doesn't work with the new format yet - if (vectorMode) { - if (!split.getDeltas().isEmpty() || !split.isOriginal()) { - throw new IOException("Vectorization and ACID tables are incompatible." - ); - } - return createVectorizedReader(inputSplit, conf, reporter); - } reporter.setStatus(inputSplit.toString()); // if we are strictly old-school, just use the old code if (split.isOriginal() && split.getDeltas().isEmpty()) { - return new OrcRecordReader(OrcFile.createReader(split.getPath(), - OrcFile.readerOptions(conf)), conf, split); + if (vectorMode) { + return createVectorizedReader(inputSplit, conf, reporter); + } else { + return new OrcRecordReader(OrcFile.createReader(split.getPath(), + OrcFile.readerOptions(conf)), conf, split); + } } Options options = new Options(conf).reporter(reporter); final RowReader inner = getReader(inputSplit, options); + if (vectorMode) { + return (org.apache.hadoop.mapred.RecordReader) + new VectorizedOrcAcidRowReader(inner, conf, (FileSplit) inputSplit); + } final RecordIdentifier id = inner.createKey(); // Return a RecordReader that is compatible with the Hive 0.12 reader diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index dcb8991..46c3bcc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -305,6 +305,7 @@ public FSRecordWriter getRawRecordWriter(Path path, public void write(Writable w) throws IOException { OrcStruct orc = (OrcStruct) w; watcher.addKey( + ((IntWritable) orc.getFieldValue(OrcRecordUpdater.OPERATION)).get(), ((LongWritable) orc.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)).get(), ((IntWritable) orc.getFieldValue(OrcRecordUpdater.BUCKET)).get(), diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 6f0e328..8f17c12 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -37,10 +37,12 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; import java.util.ArrayList; import java.util.List; @@ -53,8 +55,10 @@ public static final String ACID_KEY_INDEX_NAME = "hive.acid.key.index"; public static final String ACID_FORMAT = "_orc_acid_version"; + public static final String ACID_STATS = "hive.acid.stats"; public static final int ORC_ACID_VERSION = 0; + final static int INSERT_OPERATION = 0; final static int UPDATE_OPERATION = 1; final static int DELETE_OPERATION = 2; @@ -70,6 +74,8 @@ final static int DELTA_BUFFER_SIZE = 16 * 1024; final static long DELTA_STRIPE_SIZE = 16 * 1024 * 1024; + private static final Charset UTF8 = Charset.forName("UTF-8"); + private final AcidOutputFormat.Options options; private final Path path; private final FileSystem fs; @@ -84,6 +90,33 @@ private long insertedRows = 0; private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder(); + static class AcidStats { + long inserts; + long updates; + long deletes; + + AcidStats() { + // nothing + } + + AcidStats(String serialized) { + String[] parts = serialized.split(","); + inserts = Long.parseLong(parts[0]); + updates = Long.parseLong(parts[1]); + deletes = Long.parseLong(parts[2]); + } + + String serialize() { + StringBuilder builder = new StringBuilder(); + builder.append(inserts); + builder.append(","); + builder.append(updates); + builder.append(","); + builder.append(deletes); + return builder.toString(); + } + } + static Path getSideFile(Path main) { return new Path(main + "_flush_length"); } @@ -219,7 +252,7 @@ private void addEvent(int operation, long currentTransaction, this.originalTransaction.set(originalTransaction); this.rowId.set(rowId); item.setFieldValue(OrcRecordUpdater.ROW, row); - indexBuilder.addKey(originalTransaction, bucket.get(), rowId); + indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); writer.addRow(item); } @@ -323,6 +356,7 @@ Writer getWriter() { long lastTransaction; int lastBucket; long lastRowId; + AcidStats acidStats = new AcidStats(); @Override public void preStripeWrite(OrcFile.WriterContext context @@ -338,11 +372,26 @@ public void preStripeWrite(OrcFile.WriterContext context @Override public void preFooterWrite(OrcFile.WriterContext context ) throws IOException { - context.getWriter().addUserMetadata(OrcRecordUpdater.ACID_KEY_INDEX_NAME, - ByteBuffer.wrap(lastKey.toString().getBytes(utf8))); + context.getWriter().addUserMetadata(ACID_KEY_INDEX_NAME, + UTF8.encode(lastKey.toString())); + context.getWriter().addUserMetadata(ACID_STATS, + UTF8.encode(acidStats.serialize())); } - void addKey(long transaction, int bucket, long rowId) { + void addKey(int op, long transaction, int bucket, long rowId) { + switch (op) { + case INSERT_OPERATION: + acidStats.inserts += 1; + break; + case UPDATE_OPERATION: + acidStats.updates += 1; + break; + case DELETE_OPERATION: + acidStats.deletes += 1; + break; + default: + throw new IllegalArgumentException("Unknown operation " + op); + } lastTransaction = transaction; lastBucket = bucket; lastRowId = rowId; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java new file mode 100644 index 0000000..ca90fc5 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.*; + +import java.io.IOException; + +/** + * Implement a RecordReader that stitches together base and delta files to + * support tables and partitions stored in the ACID format. It works by using + * the non-vectorized ACID reader and moving the data into a vectorized row + * batch. + */ +class VectorizedOrcAcidRowReader + implements org.apache.hadoop.mapred.RecordReader { + private final AcidInputFormat.RowReader innerReader; + private final RecordIdentifier key; + private final OrcStruct value; + private final VectorizedRowBatchCtx rowBatchCtx; + private final ObjectInspector objectInspector; + private boolean needToSetPartition = true; + private final DataOutputBuffer buffer = new DataOutputBuffer(); + + VectorizedOrcAcidRowReader(AcidInputFormat.RowReader inner, + Configuration conf, + FileSplit split) throws IOException { + this.innerReader = inner; + this.key = inner.createKey(); + this.rowBatchCtx = new VectorizedRowBatchCtx(); + this.value = inner.createValue(); + this.objectInspector = inner.getObjectInspector(); + try { + rowBatchCtx.init(conf, split); + } catch (ClassNotFoundException e) { + throw new IOException("Failed to initialize context", e); + } catch (SerDeException e) { + throw new IOException("Failed to initialize context", e); + } catch (InstantiationException e) { + throw new IOException("Failed to initialize context", e); + } catch (IllegalAccessException e) { + throw new IOException("Failed to initialize context", e); + } catch (HiveException e) { + throw new IOException("Failed to initialize context", e); + } + } + + @Override + public boolean next(NullWritable nullWritable, + VectorizedRowBatch vectorizedRowBatch + ) throws IOException { + vectorizedRowBatch.reset(); + buffer.reset(); + if (!innerReader.next(key, value)) { + return false; + } + if (needToSetPartition) { + try { + rowBatchCtx.addPartitionColsToBatch(vectorizedRowBatch); + } catch (HiveException e) { + throw new IOException("Problem adding partition column", e); + } + needToSetPartition = false; + } + try { + VectorizedBatchUtil.addRowToBatch(value, + (StructObjectInspector) objectInspector, + vectorizedRowBatch.size++, vectorizedRowBatch, buffer); + while (vectorizedRowBatch.size < vectorizedRowBatch.selected.length && + innerReader.next(key, value)) { + VectorizedBatchUtil.addRowToBatch(value, + (StructObjectInspector) objectInspector, + vectorizedRowBatch.size++, vectorizedRowBatch, buffer); + } + } catch (HiveException he) { + throw new IOException("error iterating", he); + } + return true; + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + try { + return rowBatchCtx.createVectorizedRowBatch(); + } catch (HiveException e) { + throw new RuntimeException("Error creating a batch", e); + } + } + + @Override + public long getPos() throws IOException { + return innerReader.getPos(); + } + + @Override + public void close() throws IOException { + innerReader.close(); + } + + @Override + public float getProgress() throws IOException { + return innerReader.getProgress(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index 25fec62..d7f914a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -59,7 +59,6 @@ VectorizedOrcRecordReader(Reader file, Configuration conf, FileSplit fileSplit) throws IOException { List types = file.getTypes(); - // TODO fix to work with ACID Reader.Options options = new Reader.Options(); this.offset = fileSplit.getStart(); this.length = fileSplit.getLength(); @@ -93,7 +92,6 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti addPartitionCols = false; } reader.nextBatch(value); - rbCtx.convertRowBatchBlobToVectorizedBatch((Object) value, value.size, value); } catch (Exception e) { throw new RuntimeException(e); } @@ -108,13 +106,11 @@ public NullWritable createKey() { @Override public VectorizedRowBatch createValue() { - VectorizedRowBatch result = null; try { - result = rbCtx.createVectorizedRowBatch(); + return rbCtx.createVectorizedRowBatch(); } catch (HiveException e) { throw new RuntimeException("Error creating a batch", e); } - return result; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index adc8991..b3ab96f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -2069,6 +2069,9 @@ public synchronized long writeIntermediateFooter() throws IOException { flushStripe(); // write a footer if (stripesAtLastFlush != stripes.size()) { + if (callback != null) { + callback.preFooterWrite(callbackContext); + } int metaLength = writeMetadata(rawWriter.getPos()); int footLength = writeFooter(rawWriter.getPos() - metaLength); rawWriter.writeByte(writePostScript(footLength, metaLength)); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java index 80e64d9..7f3cb15 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; @@ -193,6 +194,7 @@ private void WriteRCFile(FileSystem fs, Path file, Configuration conf) private VectorizedRowBatch GetRowBatch() throws SerDeException, HiveException, IOException { RCFile.Reader reader = new RCFile.Reader(fs, this.testFilePath, conf); + DataOutputBuffer buffer = new DataOutputBuffer(); // Get object inspector StructObjectInspector oi = (StructObjectInspector) serDe @@ -204,7 +206,7 @@ private VectorizedRowBatch GetRowBatch() throws SerDeException, HiveException, I // Create the context VectorizedRowBatchCtx ctx = new VectorizedRowBatchCtx(oi, oi, serDe, null, null); VectorizedRowBatch batch = ctx.createVectorizedRowBatch(); - VectorizedBatchUtil.SetNoNullFields(true, batch); + VectorizedBatchUtil.setNoNullFields(batch); // Iterate thru the rows and populate the batch LongWritable rowID = new LongWritable(); @@ -213,7 +215,7 @@ private VectorizedRowBatch GetRowBatch() throws SerDeException, HiveException, I BytesRefArrayWritable cols = new BytesRefArrayWritable(); reader.getCurrentRow(cols); cols.resetValid(colCount); - ctx.addRowToBatch(i, cols, batch); + ctx.addRowToBatch(i, cols, batch, buffer); } reader.close(); batch.size = 10; diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 42a4bf7..065d6f7 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -27,6 +27,9 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.sql.Date; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -36,6 +39,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.TimeZone; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; @@ -47,10 +51,15 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.common.type.Decimal128; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; @@ -69,11 +78,13 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.shims.CombineHiveKey; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; @@ -93,6 +104,225 @@ public class TestInputOutputFormat { Path workDir = new Path(System.getProperty("test.tmp.dir","target/tmp")); + static final int MILLIS_IN_DAY = 1000 * 60 * 60 * 24; + private static final SimpleDateFormat DATE_FORMAT = + new SimpleDateFormat("yyyy/MM/dd"); + private static final SimpleDateFormat TIME_FORMAT = + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); + private static final TimeZone LOCAL_TIMEZONE = TimeZone.getDefault(); + + static { + TimeZone gmt = TimeZone.getTimeZone("GMT+0"); + DATE_FORMAT.setTimeZone(gmt); + TIME_FORMAT.setTimeZone(gmt); + TimeZone local = TimeZone.getDefault(); + } + + public static class BigRow implements Writable { + boolean booleanValue; + byte byteValue; + short shortValue; + int intValue; + long longValue; + float floatValue; + double doubleValue; + String stringValue; + HiveDecimal decimalValue; + Date dateValue; + Timestamp timestampValue; + + BigRow(long x) { + booleanValue = x % 2 == 0; + byteValue = (byte) x; + shortValue = (short) x; + intValue = (int) x; + longValue = x; + floatValue = x; + doubleValue = x; + stringValue = Long.toHexString(x); + decimalValue = HiveDecimal.create(x); + long millisUtc = x * MILLIS_IN_DAY; + millisUtc -= LOCAL_TIMEZONE.getOffset(millisUtc); + dateValue = new Date(millisUtc); + timestampValue = new Timestamp(millisUtc); + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + throw new UnsupportedOperationException("no write"); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + throw new UnsupportedOperationException("no read"); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("bigrow{booleanValue: "); + builder.append(booleanValue); + builder.append(", byteValue: "); + builder.append(byteValue); + builder.append(", shortValue: "); + builder.append(shortValue); + builder.append(", intValue: "); + builder.append(intValue); + builder.append(", longValue: "); + builder.append(longValue); + builder.append(", floatValue: "); + builder.append(floatValue); + builder.append(", doubleValue: "); + builder.append(doubleValue); + builder.append(", stringValue: "); + builder.append(stringValue); + builder.append(", decimalValue: "); + builder.append(decimalValue); + builder.append(", dateValue: "); + builder.append(DATE_FORMAT.format(dateValue)); + builder.append(", timestampValue: "); + builder.append(TIME_FORMAT.format(timestampValue)); + builder.append("}"); + return builder.toString(); + } + } + + public static class BigRowField implements StructField { + private final int id; + private final String fieldName; + private final ObjectInspector inspector; + + BigRowField(int id, String fieldName, ObjectInspector inspector) { + this.id = id; + this.fieldName = fieldName; + this.inspector = inspector; + } + + @Override + public String getFieldName() { + return fieldName; + } + + @Override + public ObjectInspector getFieldObjectInspector() { + return inspector; + } + + @Override + public String getFieldComment() { + return null; + } + + @Override + public String toString() { + return "field " + id + " " + fieldName; + } + } + + public static class BigRowInspector extends StructObjectInspector { + static final List FIELDS = new ArrayList(); + static { + FIELDS.add(new BigRowField(0, "booleanValue", + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector)); + FIELDS.add(new BigRowField(1, "byteValue", + PrimitiveObjectInspectorFactory.javaByteObjectInspector)); + FIELDS.add(new BigRowField(2, "shortValue", + PrimitiveObjectInspectorFactory.javaShortObjectInspector)); + FIELDS.add(new BigRowField(3, "intValue", + PrimitiveObjectInspectorFactory.javaIntObjectInspector)); + FIELDS.add(new BigRowField(4, "longValue", + PrimitiveObjectInspectorFactory.javaLongObjectInspector)); + FIELDS.add(new BigRowField(5, "floatValue", + PrimitiveObjectInspectorFactory.javaFloatObjectInspector)); + FIELDS.add(new BigRowField(6, "doubleValue", + PrimitiveObjectInspectorFactory.javaDoubleObjectInspector)); + FIELDS.add(new BigRowField(7, "stringValue", + PrimitiveObjectInspectorFactory.javaStringObjectInspector)); + FIELDS.add(new BigRowField(8, "decimalValue", + PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector)); + FIELDS.add(new BigRowField(9, "dateValue", + PrimitiveObjectInspectorFactory.javaDateObjectInspector)); + FIELDS.add(new BigRowField(10, "timestampValue", + PrimitiveObjectInspectorFactory.javaTimestampObjectInspector)); + } + + + @Override + public List getAllStructFieldRefs() { + return FIELDS; + } + + @Override + public StructField getStructFieldRef(String fieldName) { + for(StructField field: FIELDS) { + if (field.getFieldName().equals(fieldName)) { + return field; + } + } + throw new IllegalArgumentException("Can't find field " + fieldName); + } + + @Override + public Object getStructFieldData(Object data, StructField fieldRef) { + BigRow obj = (BigRow) data; + switch (((BigRowField) fieldRef).id) { + case 0: + return obj.booleanValue; + case 1: + return obj.byteValue; + case 2: + return obj.shortValue; + case 3: + return obj.intValue; + case 4: + return obj.longValue; + case 5: + return obj.floatValue; + case 6: + return obj.doubleValue; + case 7: + return obj.stringValue; + case 8: + return obj.decimalValue; + case 9: + return obj.dateValue; + case 10: + return obj.timestampValue; + } + throw new IllegalArgumentException("No such field " + fieldRef); + } + + @Override + public List getStructFieldsDataAsList(Object data) { + BigRow obj = (BigRow) data; + List result = new ArrayList(11); + result.add(obj.booleanValue); + result.add(obj.byteValue); + result.add(obj.shortValue); + result.add(obj.intValue); + result.add(obj.longValue); + result.add(obj.floatValue); + result.add(obj.doubleValue); + result.add(obj.stringValue); + result.add(obj.decimalValue); + result.add(obj.dateValue); + result.add(obj.timestampValue); + return result; + } + + @Override + public String getTypeName() { + return "struct"; + } + + @Override + public Category getCategory() { + return Category.STRUCT; + } + } public static class MyRow implements Writable { int x; @@ -957,6 +1187,7 @@ public void testDefaultTypes() throws Exception { * explode. * @param workDir a local filesystem work directory * @param warehouseDir a mock filesystem warehouse directory + * @param tableName the table name * @param objectInspector object inspector for the row * @param isVectorized should run vectorized * @return a JobConf that contains the necessary information @@ -964,16 +1195,19 @@ public void testDefaultTypes() throws Exception { */ JobConf createMockExecutionEnvironment(Path workDir, Path warehouseDir, + String tableName, ObjectInspector objectInspector, boolean isVectorized ) throws IOException { + Utilities.clearWorkMap(); JobConf conf = new JobConf(); conf.set("hive.exec.plan", workDir.toString()); conf.set("mapred.job.tracker", "local"); conf.set("hive.vectorized.execution.enabled", Boolean.toString(isVectorized)); conf.set("fs.mock.impl", MockFileSystem.class.getName()); conf.set("mapred.mapper.class", ExecMapper.class.getName()); - Path root = new Path(warehouseDir, "table/p=0"); + Path root = new Path(warehouseDir, tableName + "/p=0"); + ((MockFileSystem) root.getFileSystem(conf)).clear(); conf.set("mapred.input.dir", root.toString()); StringBuilder columnIds = new StringBuilder(); StringBuilder columnNames = new StringBuilder(); @@ -997,6 +1231,7 @@ JobConf createMockExecutionEnvironment(Path workDir, fs.clear(); Properties tblProps = new Properties(); + tblProps.put("name", tableName); tblProps.put("serialization.lib", OrcSerde.class.getName()); tblProps.put("columns", columnNames.toString()); tblProps.put("columns.types", columnTypes.toString()); @@ -1012,7 +1247,7 @@ JobConf createMockExecutionEnvironment(Path workDir, LinkedHashMap> aliasMap = new LinkedHashMap>(); ArrayList aliases = new ArrayList(); - aliases.add("tbl"); + aliases.add(tableName); aliasMap.put(root.toString(), aliases); mapWork.setPathToAliases(aliasMap); LinkedHashMap partMap = @@ -1025,8 +1260,9 @@ JobConf createMockExecutionEnvironment(Path workDir, // write the plan out FileSystem localFs = FileSystem.getLocal(conf).getRaw(); - FSDataOutputStream planStream = - localFs.create(new Path(workDir, "map.xml")); + Path mapXml = new Path(workDir, "map.xml"); + localFs.delete(mapXml, true); + FSDataOutputStream planStream = localFs.create(mapXml); Utilities.serializePlan(mapWork, planStream, conf); planStream.close(); return conf; @@ -1046,7 +1282,7 @@ public void testVectorization() throws Exception { ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - inspector, true); + "vectorization", inspector, true); // write the orc file to the mock file system Writer writer = @@ -1093,7 +1329,7 @@ public void testVectorizationWithBuckets() throws Exception { ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - inspector, true); + "vectorBuckets", inspector, true); // write the orc file to the mock file system Writer writer = @@ -1130,23 +1366,18 @@ public void testVectorizationWithBuckets() throws Exception { // test acid with vectorization, no combine @Test public void testVectorizationWithAcid() throws Exception { - // get the object inspector for MyRow - StructObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = (StructObjectInspector) - ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } + StructObjectInspector inspector = new BigRowInspector(); JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - inspector, true); + "vectorizationAcid", inspector, true); // write the orc file to the mock file system Path partDir = new Path(conf.get("mapred.input.dir")); OrcRecordUpdater writer = new OrcRecordUpdater(partDir, new AcidOutputFormat.Options(conf).maximumTransactionId(10) .writingBase(true).bucket(0).inspector(inspector)); - for(int i=0; i < 10; ++i) { - writer.insert(10, new MyRow(i, 2 * i)); + for(int i=0; i < 100; ++i) { + BigRow row = new BigRow(i); + writer.insert(10, row); } WriterImpl baseWriter = (WriterImpl) writer.getWriter(); writer.close(false); @@ -1159,14 +1390,44 @@ public void testVectorizationWithAcid() throws Exception { InputSplit[] splits = inputFormat.getSplits(conf, 10); assertEquals(1, splits.length); - try { - org.apache.hadoop.mapred.RecordReader + org.apache.hadoop.mapred.RecordReader reader = inputFormat.getRecordReader(splits[0], conf, Reporter.NULL); - assertTrue("should throw here", false); - } catch (IOException ioe) { - assertEquals("java.io.IOException: Vectorization and ACID tables are incompatible.", - ioe.getMessage()); + NullWritable key = reader.createKey(); + VectorizedRowBatch value = reader.createValue(); + assertEquals(true, reader.next(key, value)); + assertEquals(100, value.count()); + LongColumnVector booleanColumn = (LongColumnVector) value.cols[0]; + LongColumnVector byteColumn = (LongColumnVector) value.cols[1]; + LongColumnVector shortColumn = (LongColumnVector) value.cols[2]; + LongColumnVector intColumn = (LongColumnVector) value.cols[3]; + LongColumnVector longColumn = (LongColumnVector) value.cols[4]; + DoubleColumnVector floatColumn = (DoubleColumnVector) value.cols[5]; + DoubleColumnVector doubleCoulmn = (DoubleColumnVector) value.cols[6]; + BytesColumnVector stringColumn = (BytesColumnVector) value.cols[7]; + DecimalColumnVector decimalColumn = (DecimalColumnVector) value.cols[8]; + LongColumnVector dateColumn = (LongColumnVector) value.cols[9]; + LongColumnVector timestampColumn = (LongColumnVector) value.cols[10]; + for(int i=0; i < 100; i++) { + assertEquals("checking boolean " + i, i % 2 == 0 ? 1 : 0, + booleanColumn.vector[i]); + assertEquals("checking byte " + i, (byte) i, + byteColumn.vector[i]); + assertEquals("checking short " + i, (short) i, shortColumn.vector[i]); + assertEquals("checking int " + i, i, intColumn.vector[i]); + assertEquals("checking long " + i, i, longColumn.vector[i]); + assertEquals("checking float " + i, i, floatColumn.vector[i], 0.0001); + assertEquals("checking double " + i, i, doubleCoulmn.vector[i], 0.0001); + assertEquals("checking string " + i, new Text(Long.toHexString(i)), + stringColumn.getWritableObject(i)); + assertEquals("checking decimal " + i, new Decimal128(i), + decimalColumn.vector[i]); + assertEquals("checking date " + i, i, dateColumn.vector[i]); + long millis = (long) i * MILLIS_IN_DAY; + millis -= LOCAL_TIMEZONE.getOffset(millis); + assertEquals("checking timestamp " + i, millis * 1000000L, + timestampColumn.vector[i]); } + assertEquals(false, reader.next(key, value)); } // test non-vectorized, non-acid, combine @@ -1180,7 +1441,7 @@ public void testCombinationInputFormat() throws Exception { ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - inspector, false); + "combination", inspector, false); // write the orc file to the mock file system Path partDir = new Path(conf.get("mapred.input.dir")); @@ -1249,7 +1510,7 @@ public void testCombinationInputFormatWithAcid() throws Exception { ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"), - inspector, false); + "combinationAcid", inspector, false); // write the orc file to the mock file system Path partDir = new Path(conf.get("mapred.input.dir"));