diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java index c4554a7..f58a10b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc; @@ -141,6 +142,8 @@ vectorOpvec.add(new OpTuple(FileSinkDesc.class, VectorFileSinkOperator.class)); vectorOpvec.add(new OpTuple(FilterDesc.class, VectorFilterOperator.class)); vectorOpvec.add(new OpTuple(LimitDesc.class, VectorLimitOperator.class)); + vectorOpvec.add(new OpTuple(SparkHashTableSinkDesc.class, + VectorSparkHashTableSinkOperator.class)); } private static final class OpTuple { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java index 7c67fd2..aa8808a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java @@ -53,9 +53,6 @@ private final HashTableSinkOperator htsOperator; - // The position of this table - private byte tag; - public SparkHashTableSinkOperator() { htsOperator = new HashTableSinkOperator(); } @@ -64,6 +61,7 @@ public SparkHashTableSinkOperator() { protected Collection> initializeOp(Configuration hconf) throws HiveException { Collection> result = super.initializeOp(hconf); ObjectInspector[] inputOIs = new ObjectInspector[conf.getTagLength()]; + byte tag = conf.getTag(); inputOIs[tag] = inputObjInspectors[0]; conf.setTagOrder(new Byte[]{ tag }); htsOperator.setConf(conf); @@ -74,13 +72,14 @@ public SparkHashTableSinkOperator() { @Override public void process(Object row, int tag) throws HiveException { // Ignore the tag passed in, which should be 0, not what we want - htsOperator.process(row, this.tag); + htsOperator.process(row, conf.getTag()); } @Override public void closeOp(boolean abort) throws HiveException { try { MapJoinPersistableTableContainer[] mapJoinTables = htsOperator.mapJoinTables; + byte tag = conf.getTag(); if (mapJoinTables == null || mapJoinTables.length < tag || mapJoinTables[tag] == null) { LOG.debug("mapJoinTable is null"); @@ -177,10 +176,6 @@ protected void flushToFile(MapJoinPersistableTableContainer tableContainer, tableContainer.clear(); } - public void setTag(byte tag) { - this.tag = tag; - } - /** * Implements the getName function for the Node Interface. * diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java new file mode 100644 index 0000000..6b9ac26 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +import java.util.Collection; +import java.util.concurrent.Future; + +/** + * Vectorized version of SparkHashTableSinkOperator + * Currently the implementation just delegates all the work to super class + * + * Copied from VectorFileSinkOperator + */ +public class VectorSparkHashTableSinkOperator extends SparkHashTableSinkOperator { + + private static final long serialVersionUID = 1L; + + private VectorizationContext vContext; + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + + private transient boolean firstBatch; + + private transient VectorExtractRowDynBatch vectorExtractRowDynBatch; + + protected transient Object[] singleRow; + + public VectorSparkHashTableSinkOperator() { + } + + public VectorSparkHashTableSinkOperator(VectorizationContext vContext, OperatorDesc conf) { + super(); + this.vContext = vContext; + this.conf = (SparkHashTableSinkDesc) conf; + } + + @Override + protected Collection> initializeOp(Configuration hconf) throws HiveException { + inputObjInspectors[0] = + VectorizedBatchUtil.convertToStandardStructObjectInspector((StructObjectInspector) inputObjInspectors[0]); + + Collection> result = super.initializeOp(hconf); + assert result.isEmpty(); + + firstBatch = true; + + return result; + } + + @Override + public void process(Object row, int tag) throws HiveException { + VectorizedRowBatch batch = (VectorizedRowBatch) row; + + if (firstBatch) { + vectorExtractRowDynBatch = new VectorExtractRowDynBatch(); + vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns()); + + singleRow = new Object[vectorExtractRowDynBatch.getCount()]; + + firstBatch = false; + } + vectorExtractRowDynBatch.setBatchOnEntry(batch); + if (batch.selectedInUse) { + int selected[] = batch.selected; + for (int logical = 0 ; logical < batch.size; logical++) { + int batchIndex = selected[logical]; + vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + super.process(singleRow, tag); + } + } else { + for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) { + vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + super.process(singleRow, tag); + } + } + + vectorExtractRowDynBatch.forgetBatchOnExit(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java index c6a43d9..03e82b1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; @@ -85,6 +86,8 @@ comparatorMapping.put(SMBMapJoinOperator.class, new SMBMapJoinOperatorComparator()); comparatorMapping.put(LimitOperator.class, new LimitOperatorComparator()); comparatorMapping.put(SparkHashTableSinkOperator.class, new SparkHashTableSinkOperatorComparator()); + comparatorMapping.put(VectorSparkHashTableSinkOperator.class, + new SparkHashTableSinkOperatorComparator()); comparatorMapping.put(LateralViewJoinOperator.class, new LateralViewJoinOperatorComparator()); comparatorMapping.put(VectorGroupByOperator.class, new GroupByOperatorComparator()); comparatorMapping.put(CommonMergeJoinOperator.class, new MapJoinOperatorComparator()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index 7ebd18d..f88fd0a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -390,7 +390,7 @@ private static void insertSHTS(byte tag, TableScanOperator tableScan, MapWork bi new ArrayList>(); tableScanParents.add(tableScan); hashTableSinkOp.setParentOperators(tableScanParents); - hashTableSinkOp.setTag(tag); + hashTableSinkOp.getConf().setTag(tag); } private static void setMemUsage(MapJoinOperator mapJoinOp, Task task, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index e7b9c73..ad47547 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -89,6 +89,7 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; +import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.TezWork; @@ -920,6 +921,10 @@ boolean validateMapWorkOperator(Operator op, MapWork mWo case EVENT: ret = true; break; + case HASHTABLESINK: + ret = op instanceof SparkHashTableSinkOperator && + validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op); + break; default: ret = false; break; @@ -962,6 +967,10 @@ boolean validateReduceWorkOperator(Operator op) { case EVENT: ret = true; break; + case HASHTABLESINK: + ret = op instanceof SparkHashTableSinkOperator && + validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op); + break; default: ret = false; break; @@ -1085,6 +1094,17 @@ private boolean validateMapJoinDesc(MapJoinDesc desc) { return true; } + private boolean validateSparkHashTableSinkOperator(SparkHashTableSinkOperator op) { + SparkHashTableSinkDesc desc = op.getConf(); + byte tag = desc.getTag(); + // it's essentially a MapJoinDesc + List filterExprs = desc.getFilters().get(tag); + List keyExprs = desc.getKeys().get(tag); + List valueExprs = desc.getExprs().get(tag); + return validateExprNodeDesc(filterExprs, VectorExpressionDescriptor.Mode.FILTER) && + validateExprNodeDesc(keyExprs) && validateExprNodeDesc(valueExprs); + } + private boolean validateReduceSinkOperator(ReduceSinkOperator op) { List keyDescs = op.getConf().getKeyCols(); List partitionDescs = op.getConf().getPartitionCols(); @@ -1671,6 +1691,7 @@ private boolean canSpecializeMapJoin(Operator op, MapJoi case LIMIT: case EXTRACT: case EVENT: + case HASHTABLESINK: vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext); break; default: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java index fd42959..76517e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java @@ -283,7 +283,7 @@ public Object process(Node nd, Stack stack, parent.replaceChild(parentRS, hashTableSinkOp); } hashTableSinkOp.setParentOperators(rsParentOps); - hashTableSinkOp.setTag(tag); + hashTableSinkOp.getConf().setTag(tag); return true; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java index ff32f5e..8833ae3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkHashTableSinkDesc.java @@ -26,10 +26,21 @@ public class SparkHashTableSinkDesc extends HashTableSinkDesc { private static final long serialVersionUID = 1L; + // The position of this table + private byte tag; + public SparkHashTableSinkDesc() { } public SparkHashTableSinkDesc(MapJoinDesc clone) { super(clone); } + + public byte getTag() { + return tag; + } + + public void setTag(byte tag) { + this.tag = tag; + } } diff --git a/ql/src/test/results/clientpositive/spark/vector_decimal_mapjoin.q.out b/ql/src/test/results/clientpositive/spark/vector_decimal_mapjoin.q.out index a80a20b..0ab301b 100644 --- a/ql/src/test/results/clientpositive/spark/vector_decimal_mapjoin.q.out +++ b/ql/src/test/results/clientpositive/spark/vector_decimal_mapjoin.q.out @@ -102,6 +102,7 @@ STAGE PLANS: 1 dec (type: decimal(6,2)) Local Work: Map Reduce Local Work + Execution mode: vectorized Stage: Stage-1 Spark diff --git a/ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out b/ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out index a0e6c2a..024be1c 100644 --- a/ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out +++ b/ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out @@ -41,6 +41,7 @@ STAGE PLANS: 1 _col0 (type: int) Local Work: Map Reduce Local Work + Execution mode: vectorized Map 4 Map Operator Tree: TableScan @@ -56,6 +57,7 @@ STAGE PLANS: 1 _col0 (type: tinyint) Local Work: Map Reduce Local Work + Execution mode: vectorized Stage: Stage-1 Spark diff --git a/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out b/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out index 8cf1a81..a2dd910 100644 --- a/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out +++ b/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out @@ -92,6 +92,7 @@ STAGE PLANS: keys: 0 _col1 (type: int) 1 _col0 (type: int) + Execution mode: vectorized Stage: Stage-1 Spark diff --git a/ql/src/test/results/clientpositive/spark/vectorized_mapjoin.q.out b/ql/src/test/results/clientpositive/spark/vectorized_mapjoin.q.out index b6c2b35..c6c2435 100644 --- a/ql/src/test/results/clientpositive/spark/vectorized_mapjoin.q.out +++ b/ql/src/test/results/clientpositive/spark/vectorized_mapjoin.q.out @@ -38,6 +38,7 @@ STAGE PLANS: 1 _col0 (type: int) Local Work: Map Reduce Local Work + Execution mode: vectorized Stage: Stage-1 Spark diff --git a/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out b/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out index a25d540..7ba64b7 100644 --- a/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out +++ b/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out @@ -34,6 +34,7 @@ STAGE PLANS: 1 _col0 (type: tinyint) Local Work: Map Reduce Local Work + Execution mode: vectorized Map 4 Map Operator Tree: TableScan @@ -52,6 +53,7 @@ STAGE PLANS: 1 _col0 (type: smallint) Local Work: Map Reduce Local Work + Execution mode: vectorized Stage: Stage-1 Spark