diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0cc8de0e66..97e6192c35 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1685,6 +1685,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "If the skew information is correctly stored in the metadata, hive.optimize.skewjoin.compiletime\n" + "would change the query plan to take care of it, and hive.optimize.skewjoin will be a no-op."), + HIVE_OPTIMIZE_TOPN_KEY("hive.optimize.topn.key", false, "Whether to enable top n key optimizer."), + HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true, "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" + "and follow-up operators in the query plan and merges them if they meet some preconditions. Tez only."), diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index ab3cd43158..73290787ba 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -280,6 +280,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\ tez_union_multiinsert.q,\ tez_vector_dynpart_hashjoin_1.q,\ tez_vector_dynpart_hashjoin_2.q,\ + topn_optimization.q,\ union2.q,\ union3.q,\ union4.q,\ diff --git ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java index a002348013..f8328be25c 100644 --- ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java +++ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java @@ -37,7 +37,8 @@ ORCFILEMERGE(22), RCFILEMERGE(23), MERGEJOIN(24), - SPARKPRUNINGSINK(25); + SPARKPRUNINGSINK(25), + TOPNKEY(26); private final int value; @@ -110,6 +111,8 @@ public static OperatorType findByValue(int value) { return MERGEJOIN; case 25: return SPARKPRUNINGSINK; + case 26: + return TOPNKEY; default: return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java index 775c737bba..d233fb077f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java @@ -158,13 +158,13 @@ private void deepCopyElements(Object[] keys, } } - transient Object[] singleEleArray = new Object[1]; transient StringObjectInspector soi_new, soi_copy; class TextKeyWrapper extends KeyWrapper { int hashcode; Object key; boolean isCopy; + transient Object[] singleEleArray = new Object[1]; public TextKeyWrapper(boolean isCopy) { this(-1, null, isCopy); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java index e665064133..4894e70cef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorTopNKeyOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkCommonOperator; import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFOperator; @@ -76,6 +77,7 @@ import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.plan.TopNKeyDesc; import org.apache.hadoop.hive.ql.plan.UDTFDesc; import org.apache.hadoop.hive.ql.plan.UnionDesc; import org.apache.hadoop.hive.ql.plan.VectorDesc; @@ -126,6 +128,7 @@ opvec.put(OrcFileMergeDesc.class, OrcFileMergeOperator.class); opvec.put(CommonMergeJoinDesc.class, CommonMergeJoinOperator.class); opvec.put(ListSinkDesc.class, ListSinkOperator.class); + opvec.put(TopNKeyDesc.class, TopNKeyOperator.class); } static { @@ -143,6 +146,7 @@ vectorOpvec.put(LimitDesc.class, VectorLimitOperator.class); vectorOpvec.put(PTFDesc.class, VectorPTFOperator.class); vectorOpvec.put(SparkHashTableSinkDesc.class, VectorSparkHashTableSinkOperator.class); + vectorOpvec.put(TopNKeyDesc.class, VectorTopNKeyOperator.class); } public static Operator getVectorOperator( diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java new file mode 100644 index 0000000000..1c857e90ef --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.vector.AbstractTopNKeyOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; + +import java.io.Serializable; +import java.util.Comparator; + +public class TopNKeyOperator extends AbstractTopNKeyOperator implements Serializable { + private static final long serialVersionUID = 1L; + + private transient KeyWrapper keyWrapper; + private transient KeyWrapper standardKeyWrapper; + + private transient int rowLimit; + private transient int rowSize; + private transient Object[] rows; + private transient ObjectInspector standardObjectInspector; + + public TopNKeyOperator() { + super(); + } + + public TopNKeyOperator(CompilationOpContext ctx) { + super(ctx); + } + + public static class KeyWrapperComparator implements Comparator { + private ObjectInspector[] objectInspectors1; + private ObjectInspector[] objectInspectors2; + private boolean[] columnSortOrderIsDesc; + + public KeyWrapperComparator(ObjectInspector[] objectInspectors1, ObjectInspector[] + objectInspectors2, boolean[] columnSortOrderIsDesc) { + this.objectInspectors1 = objectInspectors1; + this.objectInspectors2 = objectInspectors2; + this.columnSortOrderIsDesc = columnSortOrderIsDesc; + } + + @Override + public int compare(KeyWrapper key1, KeyWrapper key2) { + return ObjectInspectorUtils.compare(key1.getKeyArray(), objectInspectors1, + key2.getKeyArray(), objectInspectors2, columnSortOrderIsDesc); + } + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + + keyWrapper = new KeyWrapperFactory(keyFields, keyObjectInspectors, + standardKeyObjectInspectors).getKeyWrapper(); + standardKeyWrapper = new KeyWrapperFactory(standardKeyFields, standardKeyObjectInspectors, + standardKeyObjectInspectors).getKeyWrapper(); + + rowLimit = VectorizedRowBatch.DEFAULT_SIZE; + rows = new Object[rowLimit]; + rowSize = 0; + + standardObjectInspector = ObjectInspectorUtils.getStandardObjectInspector( + inputObjInspectors[0]); + outputObjInspector = standardObjectInspector; + } + + @Override + public void process(Object row, int tag) throws HiveException { + keyWrapper.getNewKey(row, inputObjInspectors[0]); + keyWrapper.setHashKey(); + + if (!priorityQueue.contains(keyWrapper)) { + priorityQueue.offer(keyWrapper.copyKey()); + } + if (priorityQueue.size() > topN) { + priorityQueue.poll(); + } + + rows[rowSize] = ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[0]); + rowSize++; + + if (rowSize % rowLimit == 0) { + processRows(); + } + } + + private void processRows() throws HiveException { + for (int i = 0; i < rowSize; i++) { + Object row = rows[i]; + + standardKeyWrapper.getNewKey(row, standardObjectInspector); + standardKeyWrapper.setHashKey(); + + if (priorityQueue.contains(standardKeyWrapper)) { + forward(row, standardObjectInspector); + } + } + rowSize = 0; + } + + @Override + protected final void closeOp(boolean abort) throws HiveException { + processRows(); + super.closeOp(abort); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AbstractTopNKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AbstractTopNKeyOperator.java new file mode 100644 index 0000000000..787dfb4826 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AbstractTopNKeyOperator.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.vector; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; +import org.apache.hadoop.hive.ql.exec.KeyWrapper; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TopNKeyOperator; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.TopNKeyDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; + +import java.util.PriorityQueue; + +import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY; + +public abstract class AbstractTopNKeyOperator extends Operator { + + protected transient int topN; + protected transient PriorityQueue priorityQueue; + protected transient ExprNodeEvaluator[] keyFields; + protected transient ObjectInspector[] keyObjectInspectors; + protected transient ExprNodeEvaluator[] standardKeyFields; + protected transient ObjectInspector[] standardKeyObjectInspectors; + + public AbstractTopNKeyOperator() { + super(); + } + + public AbstractTopNKeyOperator(CompilationOpContext ctx) { + super(ctx); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + + this.topN = conf.getTopN(); + + String columnSortOrder = conf.getColumnSortOrder(); + boolean[] columnSortOrderIsDesc = new boolean[columnSortOrder.length()]; + for (int i = 0; i < columnSortOrderIsDesc.length; i++) { + columnSortOrderIsDesc[i] = (columnSortOrder.charAt(i) == '-'); + } + + ObjectInspector rowInspector = inputObjInspectors[0]; + ObjectInspector standardRowInspector = + ObjectInspectorUtils.getStandardObjectInspector(rowInspector); + + // init keyFields + int numKeys = conf.getKeyColumns().size(); + keyFields = new ExprNodeEvaluator[numKeys]; + keyObjectInspectors = new ObjectInspector[numKeys]; + standardKeyFields = new ExprNodeEvaluator[numKeys]; + standardKeyObjectInspectors = new ObjectInspector[numKeys]; + + for (int i = 0; i < numKeys; i++) { + ExprNodeDesc key = conf.getKeyColumns().get(i); + keyFields[i] = ExprNodeEvaluatorFactory.get(key, hconf); + keyObjectInspectors[i] = keyFields[i].initialize(rowInspector); + standardKeyFields[i] = ExprNodeEvaluatorFactory.get(key, hconf); + standardKeyObjectInspectors[i] = standardKeyFields[i].initialize(standardRowInspector); + } + + priorityQueue = new PriorityQueue<>(topN + 1, new TopNKeyOperator.KeyWrapperComparator( + standardKeyObjectInspectors, standardKeyObjectInspectors, columnSortOrderIsDesc)); + } + + @Override + public String getName() { + return getOperatorName(); + } + + static public String getOperatorName() { + return "TOPNKEY"; + } + + @Override + public OperatorType getType() { + return TOPNKEY; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java new file mode 100644 index 0000000000..6922099362 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.vector; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.TopNKeyDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; +import org.apache.hadoop.hive.ql.plan.VectorTopNKeyDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Properties; + +import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY; + +public class VectorTopNKeyOperator extends Operator implements VectorizationOperator { + + private static final long serialVersionUID = 1L; + + private VectorTopNKeyDesc vectorTopNKeyDesc; + private VectorizationContext vContext; + + private transient boolean firstBatch; + private transient Object[] singleRow; + private transient VectorExtractRow vectorExtractRow; + private transient List keyCols; + + private transient PriorityQueue priorityQueue; + private transient BinarySortableSerDe binarySortableSerDe; + private transient List columnTypes; + private transient StructObjectInspector keyObjectInspector; + + public VectorTopNKeyOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) { + this(ctx); + this.conf = (TopNKeyDesc) conf; + this.vContext = vContext; + this.vectorTopNKeyDesc = (VectorTopNKeyDesc) vectorDesc; + } + + /** Kryo ctor. */ + @VisibleForTesting + public VectorTopNKeyOperator() { + super(); + } + + public VectorTopNKeyOperator(CompilationOpContext ctx) { + super(ctx); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + + VectorExpression[] keyExpressions = vectorTopNKeyDesc.getKeyExpressions(); + + this.firstBatch = true; + this.keyCols = new ArrayList<>(); + this.columnTypes = new ArrayList<>(); + + List fieldObjectInspectors = new ArrayList<>(); + + for (VectorExpression keyExpression : keyExpressions) { + this.keyCols.add(keyExpression.getOutputColumnNum()); + this.columnTypes.add(keyExpression.getOutputTypeInfo()); + fieldObjectInspectors.add(TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo( + keyExpression.getOutputTypeInfo())); + } + + keyObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector( + conf.getKeyColumnNames(), fieldObjectInspectors); + } + + @Override + public void process(Object data, int tag) throws HiveException { + VectorizedRowBatch batch = (VectorizedRowBatch) data; + + if (firstBatch) { + vectorExtractRow = new VectorExtractRow(); + vectorExtractRow.init(keyObjectInspector, keyCols); + + singleRow = new Object[vectorExtractRow.getCount()]; + + priorityQueue = new PriorityQueue<>(); + + try { + binarySortableSerDe = new BinarySortableSerDe(); + Properties properties = new Properties(); + properties.setProperty(serdeConstants.LIST_COLUMNS, + Joiner.on(',').join(conf.getKeyColumnNames())); + properties.setProperty(serdeConstants.LIST_COLUMN_TYPES, + Joiner.on(',').join(columnTypes)); + properties.setProperty(serdeConstants.SERIALIZATION_SORT_ORDER, + conf.getColumnSortOrder()); + binarySortableSerDe.initialize(getConfiguration(), properties); + } catch (SerDeException e) { + throw new HiveException(e); + } + + firstBatch = false; + } + + // Get top n keys + for (int i = 0; i < batch.size; i++) { + + // Get keys + if (batch.selectedInUse) { + vectorExtractRow.extractRow(batch, batch.selected[i], singleRow); + } else { + vectorExtractRow.extractRow(batch, i, singleRow); + } + + Writable writable; + try { + writable = binarySortableSerDe.serialize(singleRow, keyObjectInspector); + } catch (SerDeException e) { + throw new HiveException(e); + } + + // Put the copied keys into the priority queue + if (!priorityQueue.contains(writable)) { + priorityQueue.offer(WritableUtils.clone(writable, getConfiguration())); + } + + // Limit the queue size + if (priorityQueue.size() > conf.getTopN()) { + priorityQueue.poll(); + } + } + + // Filter rows with top n keys + int size = 0; + int[] selected = new int[batch.selected.length]; + for (int i = 0; i < batch.size; i++) { + // Get keys + if (batch.selectedInUse) { + vectorExtractRow.extractRow(batch, batch.selected[i], singleRow); + } else { + vectorExtractRow.extractRow(batch, i, singleRow); + } + + Writable writable; + try { + writable = binarySortableSerDe.serialize(singleRow, keyObjectInspector); + } catch (SerDeException e) { + throw new HiveException(e); + } + + // Select a row in the priority queue + if (priorityQueue.contains(writable)) { + selected[size++] = i; + } + } + + // Apply selection to batch + if (batch.size != size) { + batch.selectedInUse = true; + batch.selected = selected; + batch.size = size; + } + + // Forward the result + forward(batch, null, true); + } + + @Override + public String getName() { + return AbstractTopNKeyOperator.getOperatorName(); + } + + @Override + public OperatorType getType() { + return TOPNKEY; + } + + @Override + public VectorizationContext getInputVectorizationContext() { + return vContext; + } + + @Override + public VectorDesc getVectorDesc() { + return vectorTopNKeyDesc; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/TopNKeyProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/TopNKeyProcessor.java new file mode 100644 index 0000000000..1e934161fb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/TopNKeyProcessor.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer; + +import org.apache.hadoop.hive.ql.exec.ForwardOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.TopNKeyDesc; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper.transform; + +public class TopNKeyProcessor implements NodeProcessor { + + private List keyCols; + private List outputKeyColumnNames; + private RowSchema rowSchema; + + public TopNKeyProcessor() { + } + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + // Get RS + ReduceSinkOperator reduceSinkOperator = (ReduceSinkOperator) nd; + ReduceSinkDesc reduceSinkDesc = reduceSinkOperator.getConf(); + + // It only allows an ordered and limited reduce sink + if (!reduceSinkDesc.isOrdering() || reduceSinkDesc.getTopN() < 0) { + return null; + } + + keyCols = reduceSinkDesc.getKeyCols(); + outputKeyColumnNames = reduceSinkDesc.getOutputKeyColumnNames(); + rowSchema = reduceSinkOperator.getSchema(); + String order = reduceSinkDesc.getOrder(); + + int depth = stack.size() - 1; + + while (pushdownTopNKey((Operator) stack.get(depth - 1))) { + depth--; + } + + Operator parentOperator = + (Operator) stack.get(depth - 1); + Operator currentOperator = + (Operator) stack.get(depth); + + // Insert top n key operator between the current operator and the parent operator + TopNKeyDesc topNKeyDesc = new TopNKeyDesc(reduceSinkDesc.getTopN(), keyCols, outputKeyColumnNames, order); + makeOpBetween(topNKeyDesc, new RowSchema(rowSchema), parentOperator, currentOperator); + + return null; + } + + private List mapKeyCols(Map mapping, List + keyCols) { + List keyColsInOperator = new ArrayList<>(); + for (ExprNodeDesc keyCol : keyCols) { + String keyExprString = keyCol.getExprString(); + if (mapping.containsKey(keyExprString)) { + keyColsInOperator.add(mapping.get(keyExprString)); + } + } + return keyColsInOperator; + } + + private boolean pushdownTopNKey(Operator parentOperator) { + if (parentOperator instanceof GroupByOperator) { + GroupByOperator groupByOperator = (GroupByOperator) parentOperator; + GroupByDesc groupByDesc = groupByOperator.getConf(); + + List mappedKeyCols = mapKeyCols(groupByDesc.getColumnExprMap(), keyCols); + if (!transform(groupByDesc.getKeys()).containsAll(transform(mappedKeyCols))) { + return false; + } + + keyCols = mappedKeyCols; + rowSchema = groupByOperator.getSchema(); + return true; + } else if (parentOperator instanceof ForwardOperator) { + // Forward + return true; + } else if (parentOperator instanceof SelectOperator) { + // Vectorized select operator handles differently, and it's efficient enough. + + SelectOperator selectOperator = (SelectOperator) parentOperator; + SelectDesc selectDesc = selectOperator.getConf(); + + List mappedKeyCols = mapKeyCols(selectDesc.getColumnExprMap(), keyCols); + if (!transform(selectDesc.getColList()).containsAll(transform(mappedKeyCols))) { + return false; + } + + keyCols = mappedKeyCols; + rowSchema = selectOperator.getSchema(); + return true; + } + return false; + } + + private static void makeOpBetween(OperatorDesc newOperatorDesc, RowSchema rowSchema, + Operator parentOperator, Operator + currentOperator) { + + // PARENT -> (NEW, (CURRENT -> CHILD)) + Operator newOperator = OperatorFactory.getAndMakeChild( + currentOperator.getCompilationOpContext(), newOperatorDesc, rowSchema, + currentOperator.getParentOperators()); + + // PARENT -> NEW -> CURRENT -> CHILD + newOperator.getChildOperators().add(currentOperator); + currentOperator.getParentOperators().add(newOperator); + parentOperator.removeChild(currentOperator); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 263d2c7281..6f12b1cce9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -137,6 +137,7 @@ import org.apache.hadoop.hive.ql.plan.PTFDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.Statistics; +import org.apache.hadoop.hive.ql.plan.TopNKeyDesc; import org.apache.hadoop.hive.ql.plan.VectorAppMasterEventDesc; import org.apache.hadoop.hive.ql.plan.VectorDesc; import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc; @@ -149,6 +150,7 @@ import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc.ProcessingMode; import org.apache.hadoop.hive.ql.plan.VectorSparkHashTableSinkDesc; import org.apache.hadoop.hive.ql.plan.VectorSparkPartitionPruningSinkDesc; +import org.apache.hadoop.hive.ql.plan.VectorTopNKeyDesc; import org.apache.hadoop.hive.ql.plan.VectorLimitDesc; import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo; import org.apache.hadoop.hive.ql.plan.VectorSMBJoinDesc; @@ -4015,6 +4017,20 @@ private boolean usesVectorUDFAdaptor(VectorExpression[] vecExprs) { vContext, vectorSelectDesc); } + private static Operator vectorizeTopNKeyOperator( + Operator topNKeyOperator, VectorizationContext vContext, + VectorTopNKeyDesc vectorTopNKeyDesc) throws HiveException { + + TopNKeyDesc topNKeyDesc = (TopNKeyDesc) topNKeyOperator.getConf(); + + vectorTopNKeyDesc.setKeyExpressions(vContext.getVectorExpressions( + topNKeyDesc.getKeyColumns())); + + return OperatorFactory.getVectorOperator( + topNKeyOperator.getCompilationOpContext(), topNKeyDesc, + vContext, vectorTopNKeyDesc); + } + private static void fillInPTFEvaluators( List windowsFunctions, String[] evaluatorFunctionNames, @@ -4572,6 +4588,13 @@ private static VectorPTFInfo createVectorPTFInfo(Operator inputs, OptimizeTezProcContext procCtx = new OptimizeTezProcContext(conf, pCtx, inputs, outputs); perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); + runTopNKeyOptimization(procCtx); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Run top n optimization"); + + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); // setup dynamic partition pruning where possible runDynamicPartitionPruning(procCtx, inputs, outputs); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Setup dynamic partition pruning"); @@ -996,6 +1001,21 @@ private void removeSemiJoinIfNoStats(OptimizeTezProcContext procCtx) ogw.startWalking(topNodes, null); } + private static void runTopNKeyOptimization(OptimizeTezProcContext procCtx) + throws SemanticException { + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("Top N optimization", ReduceSinkOperator.getOperatorName() + "%"), + new TopNKeyProcessor()); + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); + List topNodes = new ArrayList(); + topNodes.addAll(procCtx.parseContext.getTopOps().values()); + GraphWalker ogw = new PreOrderOnceWalker(disp); + ogw.startWalking(topNodes, null); + } + private boolean findParallelSemiJoinBranch(Operator mapjoin, TableScanOperator bigTableTS, ParseContext parseContext, Map semijoins) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java new file mode 100644 index 0000000000..1ac3e388f5 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TopNKeyDesc.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.hive.ql.plan.Explain.Level; + +import java.util.ArrayList; +import java.util.List; + +/** + * TopNKeyDesc. + * + */ +@Explain(displayName = "Top N Key Operator", + explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + +public class TopNKeyDesc extends AbstractOperatorDesc { + private static final long serialVersionUID = 1L; + + private int topN; + private List keyColumns; + private List keyColumnNames; + private String columnSortOrder; + + public TopNKeyDesc() { + } + + public TopNKeyDesc( + final int topN, + final List keyColumns, + final List keyColumnNames, + final String columnSortOrder) { + this.topN = topN; + this.keyColumns = keyColumns; + this.keyColumnNames = keyColumnNames; + this.columnSortOrder = columnSortOrder; + } + + @Explain(displayName = "keys") + public String getKeyString() { + return PlanUtils.getExprListString(keyColumns); + } + + @Explain(displayName = "keys", explainLevels = { Level.USER }) + public String getUserLevelExplainKeyString() { + return PlanUtils.getExprListString(keyColumns, true); + } + + public List getKeyColumns() { + return keyColumns; + } + + public void setKeyColumns(final ArrayList keyColumns) { + this.keyColumns = keyColumns; + } + + public int getTopN() { + return topN; + } + + public void setTopN(int topN) { + this.topN = topN; + } + + public String getColumnSortOrder() { + return columnSortOrder; + } + + public void setColumnSortOrder(String columnSortOrder) { + this.columnSortOrder = columnSortOrder; + } + + @Override + public boolean isSame(OperatorDesc other) { + if (getClass().getName().equals(other.getClass().getName())) { + TopNKeyDesc otherDesc = (TopNKeyDesc) other; + return getTopN() == otherDesc.getTopN() && + columnSortOrder.equals(otherDesc.columnSortOrder) && + keyColumnNames.equals(otherDesc.keyColumnNames) && + keyColumns.equals(otherDesc.keyColumns); + } + return false; + } + + public List getKeyColumnNames() { + return keyColumnNames; + } + + public void setKeyColumnNames(List keyColumnNames) { + this.keyColumnNames = keyColumnNames; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/VectorTopNKeyDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/VectorTopNKeyDesc.java new file mode 100644 index 0000000000..23cb1cc6ad --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/VectorTopNKeyDesc.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; + +public class VectorTopNKeyDesc extends AbstractVectorDesc { + private static final long serialVersionUID = 1L; + + private VectorExpression[] keyExpressions; + + public VectorTopNKeyDesc() { + } + + public void setKeyExpressions(VectorExpression[] keyExpressions) { + this.keyExpressions = keyExpressions; + } + + public VectorExpression[] getKeyExpressions() { + return keyExpressions; + } +} diff --git ql/src/test/queries/clientpositive/topn_key.q ql/src/test/queries/clientpositive/topn_key.q new file mode 100644 index 0000000000..98ae52f87c --- /dev/null +++ ql/src/test/queries/clientpositive/topn_key.q @@ -0,0 +1,21 @@ +set hive.mapred.mode=nonstrict; +set mapred.reduce.tasks=31; +set hive.optimize.topn.key=true; + +-- SORT_QUERY_RESULTS + +CREATE TABLE dest1(key INT, value DOUBLE) STORED AS TEXTFILE; + +EXPLAIN +FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key LIMIT 5; + +FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key ORDER BY src.key LIMIT 5; + +SELECT dest1.* FROM dest1; + +EXPLAIN +SELECT src.key FROM src GROUP BY src.key LIMIT 5; + +SELECT src.key FROM src GROUP BY src.key LIMIT 5; diff --git ql/src/test/queries/clientpositive/vector_topn_key.q ql/src/test/queries/clientpositive/vector_topn_key.q new file mode 100644 index 0000000000..9762dcf73c --- /dev/null +++ ql/src/test/queries/clientpositive/vector_topn_key.q @@ -0,0 +1,22 @@ +set hive.mapred.mode=nonstrict; +set mapred.reduce.tasks=31; +set hive.vectorized.execution.enabled=true; +set hive.optimize.topn.key=true; + +-- SORT_QUERY_RESULTS + +CREATE TABLE dest1(key INT, value DOUBLE) STORED AS TEXTFILE; + +explain vectorization +FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key LIMIT 5; + +FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key ORDER BY src.key LIMIT 5; + +SELECT dest1.* FROM dest1; + +explain vectorization +SELECT src.key FROM src GROUP BY src.key LIMIT 5; + +SELECT src.key FROM src GROUP BY src.key LIMIT 5; diff --git ql/src/test/results/clientpositive/llap/topn_key.q.out ql/src/test/results/clientpositive/llap/topn_key.q.out new file mode 100644 index 0000000000..ce02b560b6 --- /dev/null +++ ql/src/test/results/clientpositive/llap/topn_key.q.out @@ -0,0 +1,221 @@ +PREHOOK: query: CREATE TABLE dest1(key INT, value DOUBLE) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dest1 +POSTHOOK: query: CREATE TABLE dest1(key INT, value DOUBLE) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dest1 +PREHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Top N Key Operator + keys: key (type: string) + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string), substr(value, 5) (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: sum(_col1) + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE + TopN Hash Memory Usage: 0.1 + value expressions: _col1 (type: double) + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 23750 Basic stats: COMPLETE Column stats: COMPLETE + Limit + Number of rows: 5 + Statistics: Num rows: 5 Data size: 475 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 5 Data size: 475 Basic stats: COMPLETE Column stats: COMPLETE + TopN Hash Memory Usage: 0.1 + value expressions: _col0 (type: string), _col1 (type: double) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: string), VALUE._col1 (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 5 Data size: 475 Basic stats: COMPLETE Column stats: COMPLETE + Limit + Number of rows: 5 + Statistics: Num rows: 5 Data size: 475 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: UDFToInteger(_col0) (type: int), _col1 (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 5 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 5 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-3 + Stats Work + Basic Stats Work: + +PREHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key ORDER BY src.key LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dest1 +POSTHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key ORDER BY src.key LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@dest1 +POSTHOOK: Lineage: dest1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: dest1.value EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SELECT dest1.* FROM dest1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT dest1.* FROM dest1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest1 +#### A masked pattern was here #### +0 0.0 +10 10.0 +100 200.0 +103 206.0 +104 208.0 +PREHOOK: query: EXPLAIN +SELECT src.key FROM src GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT src.key FROM src GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Top N Key Operator + keys: key (type: string) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + keys: key (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 21750 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 250 Data size: 21750 Basic stats: COMPLETE Column stats: COMPLETE + TopN Hash Memory Usage: 0.1 + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 21750 Basic stats: COMPLETE Column stats: COMPLETE + Limit + Number of rows: 5 + Statistics: Num rows: 5 Data size: 435 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 5 Data size: 435 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 5 + Processor Tree: + ListSink + +PREHOOK: query: SELECT src.key FROM src GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT src.key FROM src GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 +10 +100 +103 +104 diff --git ql/src/test/results/clientpositive/llap/vector_topn_key.q.out ql/src/test/results/clientpositive/llap/vector_topn_key.q.out new file mode 100644 index 0000000000..5282816e35 --- /dev/null +++ ql/src/test/results/clientpositive/llap/vector_topn_key.q.out @@ -0,0 +1,137 @@ +PREHOOK: query: CREATE TABLE dest1(key INT, value DOUBLE) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dest1 +POSTHOOK: query: CREATE TABLE dest1(key INT, value DOUBLE) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dest1 +PREHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE dest1 SELECT src.key, src.key v GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE dest1 SELECT src.key, src.key v GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Top N Key Operator + keys: key (type: string) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + keys: key (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 21750 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 250 Data size: 21750 Basic stats: COMPLETE Column stats: COMPLETE + TopN Hash Memory Usage: 0.1 + Execution mode: vectorized, llap + LLAP IO: no inputs + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 21750 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: string), _col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Limit + Number of rows: 5 + Statistics: Num rows: 5 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 5 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE + TopN Hash Memory Usage: 0.1 + value expressions: _col0 (type: string), _col1 (type: string) + Reducer 3 + Execution mode: vectorized, llap + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: string), VALUE._col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 5 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE + Limit + Number of rows: 5 + Statistics: Num rows: 5 Data size: 870 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: UDFToInteger(_col0) (type: int), UDFToDouble(_col1) (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 5 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 5 Data size: 60 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-3 + Stats Work + Basic Stats Work: + +PREHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 SELECT src.key, src.key v GROUP BY src.key ORDER BY src.key LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dest1 +POSTHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 SELECT src.key, src.key v GROUP BY src.key ORDER BY src.key LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@dest1 +POSTHOOK: Lineage: dest1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: dest1.value EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: SELECT dest1.* FROM dest1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT dest1.* FROM dest1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest1 +#### A masked pattern was here #### +0 0.0 +10 10.0 +100 100.0 +103 103.0 +104 104.0 diff --git ql/src/test/results/clientpositive/tez/topn_key.q.out ql/src/test/results/clientpositive/tez/topn_key.q.out new file mode 100644 index 0000000000..5ebfb7e7f4 --- /dev/null +++ ql/src/test/results/clientpositive/tez/topn_key.q.out @@ -0,0 +1,128 @@ +PREHOOK: query: CREATE TABLE dest1(key INT, value DOUBLE) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dest1 +POSTHOOK: query: CREATE TABLE dest1(key INT, value DOUBLE) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dest1 +PREHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) +Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.dest1"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 3 + File Output Operator [FS_12] + table:{"name:":"default.dest1"} + Select Operator [SEL_11] (rows=5 width=12) + Output:["_col0","_col1"] + Limit [LIM_10] (rows=5 width=95) + Number of rows:5 + Select Operator [SEL_9] (rows=5 width=95) + Output:["_col0","_col1"] + <-Reducer 2 [CUSTOM_SIMPLE_EDGE] + PARTITION_ONLY_SHUFFLE [RS_8] + Limit [LIM_7] (rows=5 width=95) + Number of rows:5 + Group By Operator [GBY_5] (rows=250 width=95) + Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] + SHUFFLE [RS_4] + PartitionCols:_col0 + Group By Operator [GBY_3] (rows=250 width=95) + Output:["_col0","_col1"],aggregations:["sum(_col1)"],keys:_col0 + Select Operator [SEL_1] (rows=500 width=178) + Output:["_col0","_col1"] + Top N Key Operator [TOPNKEY_13] (rows=500 width=178) + keys:key + TableScan [TS_0] (rows=500 width=178) + default@src,src,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"] + +PREHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key ORDER BY src.key LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dest1 +POSTHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key ORDER BY src.key LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@dest1 +POSTHOOK: Lineage: dest1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: dest1.value EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SELECT dest1.* FROM dest1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT dest1.* FROM dest1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest1 +#### A masked pattern was here #### +0 0.0 +10 10.0 +100 200.0 +103 206.0 +104 208.0 +PREHOOK: query: EXPLAIN +SELECT src.key FROM src GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT src.key FROM src GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) + +Stage-0 + Fetch Operator + limit:5 + Stage-1 + Reducer 2 + File Output Operator [FS_7] + Limit [LIM_6] (rows=5 width=87) + Number of rows:5 + Group By Operator [GBY_4] (rows=250 width=87) + Output:["_col0"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] + SHUFFLE [RS_3] + PartitionCols:_col0 + Group By Operator [GBY_2] (rows=250 width=87) + Output:["_col0"],keys:key + Select Operator [SEL_1] (rows=500 width=87) + Output:["key"] + Top N Key Operator [TOPNKEY_8] (rows=500 width=87) + keys:key + TableScan [TS_0] (rows=500 width=87) + default@src,src,Tbl:COMPLETE,Col:COMPLETE,Output:["key"] + +PREHOOK: query: SELECT src.key FROM src GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT src.key FROM src GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 +10 +100 +103 +104 diff --git ql/src/test/results/clientpositive/tez/vector_topn_key.q.out ql/src/test/results/clientpositive/tez/vector_topn_key.q.out new file mode 100644 index 0000000000..071ebe737b --- /dev/null +++ ql/src/test/results/clientpositive/tez/vector_topn_key.q.out @@ -0,0 +1,128 @@ +PREHOOK: query: CREATE TABLE dest1(key INT, value DOUBLE) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dest1 +POSTHOOK: query: CREATE TABLE dest1(key INT, value DOUBLE) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dest1 +PREHOOK: query: explain vectorization +FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +POSTHOOK: query: explain vectorization +FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) +Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.dest1"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 3 vectorized + File Output Operator [FS_22] + table:{"name:":"default.dest1"} + Select Operator [SEL_21] (rows=5 width=12) + Output:["_col0","_col1"] + Limit [LIM_20] (rows=5 width=95) + Number of rows:5 + Select Operator [SEL_19] (rows=5 width=95) + Output:["_col0","_col1"] + <-Reducer 2 [CUSTOM_SIMPLE_EDGE] vectorized + PARTITION_ONLY_SHUFFLE [RS_18] + Limit [LIM_17] (rows=5 width=95) + Number of rows:5 + Group By Operator [GBY_16] (rows=250 width=95) + Output:["_col0","_col1"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] + SHUFFLE [RS_4] + PartitionCols:_col0 + Group By Operator [GBY_3] (rows=250 width=95) + Output:["_col0","_col1"],aggregations:["sum(_col1)"],keys:_col0 + Select Operator [SEL_1] (rows=500 width=178) + Output:["_col0","_col1"] + Top N Key Operator [TOPNKEY_13] (rows=500 width=178) + keys:key + TableScan [TS_0] (rows=500 width=178) + default@src,src,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"] + +PREHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key ORDER BY src.key LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dest1 +POSTHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key ORDER BY src.key LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@dest1 +POSTHOOK: Lineage: dest1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: dest1.value EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SELECT dest1.* FROM dest1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT dest1.* FROM dest1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest1 +#### A masked pattern was here #### +NULL 0.0 +NULL 10.0 +NULL 200.0 +NULL 206.0 +NULL 208.0 +PREHOOK: query: explain vectorization +SELECT src.key FROM src GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +POSTHOOK: query: explain vectorization +SELECT src.key FROM src GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) + +Stage-0 + Fetch Operator + limit:5 + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_15] + Limit [LIM_14] (rows=5 width=87) + Number of rows:5 + Group By Operator [GBY_13] (rows=250 width=87) + Output:["_col0"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_12] + PartitionCols:_col0 + Group By Operator [GBY_11] (rows=250 width=87) + Output:["_col0"],keys:key + Select Operator [SEL_10] (rows=500 width=87) + Output:["key"] + Top N Key Operator [TOPNKEY_9] (rows=500 width=87) + keys:key + TableScan [TS_0] (rows=500 width=87) + default@src,src,Tbl:COMPLETE,Col:COMPLETE,Output:["key"] + +PREHOOK: query: SELECT src.key FROM src GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT src.key FROM src GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +92 +95 +96 +97 +98 diff --git ql/src/test/results/clientpositive/topn_key.q.out ql/src/test/results/clientpositive/topn_key.q.out new file mode 100644 index 0000000000..44fd9dc923 --- /dev/null +++ ql/src/test/results/clientpositive/topn_key.q.out @@ -0,0 +1,200 @@ +PREHOOK: query: CREATE TABLE dest1(key INT, value DOUBLE) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dest1 +POSTHOOK: query: CREATE TABLE dest1(key INT, value DOUBLE) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dest1 +PREHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), substr(value, 5) (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: sum(_col1) + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.1 + value expressions: _col1 (type: double) + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 5 + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.1 + value expressions: _col0 (type: string), _col1 (type: double) + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: string), VALUE._col1 (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 5 + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: UDFToInteger(_col0) (type: int), _col1 (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-3 + Stats Work + Basic Stats Work: + +PREHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key ORDER BY src.key LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dest1 +POSTHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key ORDER BY src.key LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@dest1 +POSTHOOK: Lineage: dest1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: dest1.value EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SELECT dest1.* FROM dest1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT dest1.* FROM dest1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest1 +#### A masked pattern was here #### +0 0.0 +10 10.0 +100 200.0 +103 206.0 +104 208.0 +PREHOOK: query: EXPLAIN +SELECT src.key FROM src GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT src.key FROM src GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: key (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.1 + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 5 + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 5 + Processor Tree: + ListSink + +PREHOOK: query: SELECT src.key FROM src GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT src.key FROM src GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 +10 +100 +103 +104 diff --git ql/src/test/results/clientpositive/vector_topn_key.q.out ql/src/test/results/clientpositive/vector_topn_key.q.out new file mode 100644 index 0000000000..1288b00127 --- /dev/null +++ ql/src/test/results/clientpositive/vector_topn_key.q.out @@ -0,0 +1,202 @@ +PREHOOK: query: CREATE TABLE dest1(key INT, value DOUBLE) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dest1 +POSTHOOK: query: CREATE TABLE dest1(key INT, value DOUBLE) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dest1 +PREHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), substr(value, 5) (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: sum(_col1) + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.1 + value expressions: _col1 (type: double) + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 5 + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.1 + value expressions: _col0 (type: string), _col1 (type: double) + Execution mode: vectorized + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: string), VALUE._col1 (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 5 + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: UDFToInteger(_col0) (type: int), _col1 (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-3 + Stats Work + Basic Stats Work: + +PREHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key ORDER BY src.key LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dest1 +POSTHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 +SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key ORDER BY src.key LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@dest1 +POSTHOOK: Lineage: dest1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: dest1.value EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SELECT dest1.* FROM dest1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT dest1.* FROM dest1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest1 +#### A masked pattern was here #### +0 0.0 +10 10.0 +100 200.0 +103 206.0 +104 208.0 +PREHOOK: query: EXPLAIN +SELECT src.key FROM src GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT src.key FROM src GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: key (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.1 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 5 + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 5 + Processor Tree: + ListSink + +PREHOOK: query: SELECT src.key FROM src GROUP BY src.key LIMIT 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT src.key FROM src GROUP BY src.key LIMIT 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 +10 +100 +103 +104 diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java index 698ebe1ffe..8bec504597 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java @@ -783,6 +783,25 @@ public static int compare(Object[] o1, ObjectInspector[] oi1, Object[] o2, return 0; } + public static int compare(Object[] o1, ObjectInspector[] oi1, Object[] o2, + ObjectInspector[] oi2, boolean[] columnSortOrderIsDesc) { + assert (o1.length == oi1.length); + assert (o2.length == oi2.length); + assert (o1.length == o2.length); + + for (int i = 0; i < o1.length; i++) { + int r = compare(o1[i], oi1[i], o2[i], oi2[i]); + if (r != 0) { + if (columnSortOrderIsDesc[i]) { + return r; + } else { + return -r; + } + } + } + return 0; + } + /** * Whether comparison is supported for this type. * Currently all types that references any map are not comparable.