diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java index b573e3e..1ddcbc6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -45,7 +46,8 @@ private int keyColCount; private int valueColCount; - private transient int [] projectedColumns = null; + private transient VectorizedRowBatch outputBatch; + private transient int remainingColCount; public VectorExtractOperator(VectorizationContext vContext, OperatorDesc conf) throws HiveException { @@ -57,26 +59,25 @@ public VectorExtractOperator() { super(); } - private StructObjectInspector makeStandardStructObjectInspector(StructObjectInspector structObjectInspector) { - List fields = structObjectInspector.getAllStructFieldRefs(); + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + StructObjectInspector structInputObjInspector = (StructObjectInspector) inputObjInspectors[0]; + List fields = structInputObjInspector.getAllStructFieldRefs(); ArrayList ois = new ArrayList(); ArrayList colNames = new ArrayList(); - for (StructField field: fields) { - colNames.add(field.getFieldName()); + for (int i = keyColCount; i < fields.size(); i++) { + StructField field = fields.get(i); + String fieldName = field.getFieldName(); + + // Remove "VALUE." prefix. + int dotIndex = fieldName.indexOf("."); + colNames.add(fieldName.substring(dotIndex + 1)); ois.add(field.getFieldObjectInspector()); } - return ObjectInspectorFactory + outputObjInspector = ObjectInspectorFactory .getStandardStructObjectInspector(colNames, ois); - } - - @Override - protected void initializeOp(Configuration hconf) throws HiveException { - outputObjInspector = inputObjInspectors[0]; - LOG.info("VectorExtractOperator class of outputObjInspector is " + outputObjInspector.getClass().getName()); - projectedColumns = new int [valueColCount]; - for (int i = 0; i < valueColCount; i++) { - projectedColumns[i] = keyColCount + i; - } + remainingColCount = fields.size() - keyColCount; + outputBatch = new VectorizedRowBatch(remainingColCount); initializeChildren(hconf); } @@ -86,20 +87,16 @@ public void setKeyAndValueColCounts(int keyColCount, int valueColCount) { } @Override - // Evaluate vectorized batches of rows and forward them. + // Remove the key columns and forward the values (and scratch columns). public void processOp(Object row, int tag) throws HiveException { - VectorizedRowBatch vrg = (VectorizedRowBatch) row; + VectorizedRowBatch inputBatch = (VectorizedRowBatch) row; - // Project away the key columns... - int[] originalProjections = vrg.projectedColumns; - int originalProjectionSize = vrg.projectionSize; - vrg.projectionSize = valueColCount; - vrg.projectedColumns = this.projectedColumns; - - forward(vrg, outputObjInspector); + // Copy references to the input columns array starting after the keys... + for (int i = 0; i < remainingColCount; i++) { + outputBatch.cols[i] = inputBatch.cols[keyColCount + i]; + } + outputBatch.size = inputBatch.size; - // Revert the projected columns back, because vrg will be re-used. - vrg.projectionSize = originalProjectionSize; - vrg.projectedColumns = originalProjections; + forward(outputBatch, outputObjInspector); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java index e546dd1..ea32f33 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.exec.vector; -import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -27,16 +25,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.io.ObjectWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; /** * File Sink operator implementation. @@ -69,113 +58,10 @@ protected void initializeOp(Configuration hconf) throws HiveException { @Override public void processOp(Object data, int tag) throws HiveException { - VectorizedRowBatch vrg = (VectorizedRowBatch)data; - - Writable [] records = null; - boolean vectorizedSerde = false; - try { - if (serializer instanceof VectorizedSerde) { - recordValue = ((VectorizedSerde) serializer).serializeVector(vrg, - inputObjInspectors[0]); - records = (Writable[]) ((ObjectWritable) recordValue).get(); - vectorizedSerde = true; - } - } catch (SerDeException e1) { - throw new HiveException(e1); - } - for (int i = 0; i < vrg.size; i++) { - Writable row = null; - if (vectorizedSerde) { - row = records[i]; - } else { - if (vrg.valueWriters == null) { - vrg.setValueWriters(this.valueWriters); - } - try { - row = serializer.serialize(getRowObject(vrg, i), inputObjInspectors[0]); - } catch (SerDeException ex) { - throw new HiveException(ex); - } - } - /* Create list bucketing sub-directory only if stored-as-directories is on. */ - String lbDirName = null; - lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row); - - FSPaths fpaths; - - if (!bDynParts && !filesCreated) { - if (lbDirName != null) { - FSPaths fsp2 = lookupListBucketingPaths(lbDirName); - } else { - createBucketFiles(fsp); - } - } - - try { - updateProgress(); - - // if DP is enabled, get the final output writers and prepare the real output row - assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT : "input object inspector is not struct"; - - if (bDynParts) { - // copy the DP column values from the input row to dpVals - dpVals.clear(); - dpWritables.clear(); - ObjectInspectorUtils.partialCopyToStandardObject(dpWritables, row, dpStartCol, numDynParts, - (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); - // get a set of RecordWriter based on the DP column values - // pass the null value along to the escaping process to determine what the dir should be - for (Object o : dpWritables) { - if (o == null || o.toString().length() == 0) { - dpVals.add(dpCtx.getDefaultPartitionName()); - } else { - dpVals.add(o.toString()); - } - } - fpaths = getDynOutPaths(dpVals, lbDirName); - - } else { - if (lbDirName != null) { - fpaths = lookupListBucketingPaths(lbDirName); - } else { - fpaths = fsp; - } - } - - rowOutWriters = fpaths.getOutWriters(); - // check if all record writers implement statistics. if atleast one RW - // doesn't implement stats interface we will fallback to conventional way - // of gathering stats - isCollectRWStats = areAllTrue(statsFromRecordWriter); - if (conf.isGatherStats() && !isCollectRWStats) { - if (statsCollectRawDataSize) { - SerDeStats stats = serializer.getSerDeStats(); - if (stats != null) { - fpaths.getStat().addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); - } - } - fpaths.getStat().addToStat(StatsSetupConst.ROW_COUNT, 1); - } - - - if (row_count != null) { - row_count.set(row_count.get() + 1); - } - - if (!multiFileSpray) { - rowOutWriters[0].write(row); - } else { - int keyHashCode = 0; - key.setHashCode(keyHashCode); - int bucketNum = prtner.getBucket(key, null, totalFiles); - int idx = bucketMap.get(bucketNum); - rowOutWriters[idx].write(row); - } - } catch (IOException e) { - throw new HiveException(e); - } + Object[] row = getRowObject(vrg, i); + super.processOp(row, tag); } } @@ -187,7 +73,7 @@ public void processOp(Object data, int tag) throws HiveException { } for (int i = 0; i < vrg.projectionSize; i++) { ColumnVector vectorColumn = vrg.cols[vrg.projectedColumns[i]]; - singleRow[i] = vrg.valueWriters[i].writeValue(vectorColumn, batchIndex); + singleRow[i] = valueWriters[i].writeValue(vectorColumn, batchIndex); } return singleRow; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index c832eec..9cbec1f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -1062,11 +1062,6 @@ private boolean validateExtractOperator(ExtractOperator op) { } private boolean validateFileSinkOperator(FileSinkOperator op) { - // HIVE-7557: For now, turn off dynamic partitioning to give more time to - // figure out how to make VectorFileSink work correctly with it... - if (op.getConf().getDynPartCtx() != null) { - return false; - } return true; } diff --git ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out index b43f0c9..bbe0250 100644 --- ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out +++ ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out @@ -214,6 +214,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde name: default.over1k_part_orc + Execution mode: vectorized Stage: Stage-2 Dependency Collection @@ -300,6 +301,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde name: default.over1k_part_limit_orc + Execution mode: vectorized Stage: Stage-2 Dependency Collection @@ -368,6 +370,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde name: default.over1k_part_buck_orc + Execution mode: vectorized Stage: Stage-2 Dependency Collection @@ -435,6 +438,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde name: default.over1k_part_buck_sort_orc + Execution mode: vectorized Stage: Stage-2 Dependency Collection @@ -585,6 +589,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde name: default.over1k_part_orc + Execution mode: vectorized Stage: Stage-2 Dependency Collection @@ -671,6 +676,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde name: default.over1k_part_limit_orc + Execution mode: vectorized Stage: Stage-2 Dependency Collection @@ -739,6 +745,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde name: default.over1k_part_buck_orc + Execution mode: vectorized Stage: Stage-2 Dependency Collection @@ -806,6 +813,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde name: default.over1k_part_buck_sort_orc + Execution mode: vectorized Stage: Stage-2 Dependency Collection @@ -1362,6 +1370,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.over1k_part2_orc + Execution mode: vectorized Stage: Stage-2 Dependency Collection @@ -1443,6 +1452,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.over1k_part2_orc + Execution mode: vectorized Stage: Stage-2 Dependency Collection @@ -1531,6 +1541,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.over1k_part2_orc + Execution mode: vectorized Stage: Stage-2 Dependency Collection @@ -1610,6 +1621,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.over1k_part2_orc + Execution mode: vectorized Stage: Stage-2 Dependency Collection @@ -1703,6 +1715,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.over1k_part2_orc + Execution mode: vectorized Stage: Stage-2 Dependency Collection @@ -2097,6 +2110,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.over1k_part_buck_sort2_orc + Execution mode: vectorized Stage: Stage-2 Dependency Collection @@ -2164,6 +2178,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.over1k_part_buck_sort2_orc + Execution mode: vectorized Stage: Stage-2 Dependency Collection