diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index dbd2a0f..2709e65 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1402,6 +1402,13 @@ HIVEOUTERJOINSUPPORTSFILTERS("hive.outerjoin.supports.filters", true, ""), + HIVEPARALLELORDERBYBUCKETINGNUM("hive.parallel.orderby.bucketing.num", 0, + "Generally in hive, order by clause is executed by single reducer. But if the query is just for patching\n" + + "not for inserting into table or partition, fully ordered stream could be provided in fetch task by\n" + + "merging partially sorted streams from multiple reducers.\n" + + "This configuration means the number of reducers for the last MapReduce task for order by.\n" + + "-1 means it will be decided by usual calculation(based on hive.exec.reducers.bytes.per.reducer). 0 disables this."), + HIVEFETCHTASKCONVERSION("hive.fetch.task.conversion", "more", new StringSet("none", "minimal", "more"), "Some select queries can be converted to single FETCH task minimizing latency.\n" + "Currently the query should be single sourced not having any subquery and should not have\n" + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java index e2f696e..123861c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java @@ -276,7 +276,7 @@ private void unpackStructObject(ObjectInspector oi, Object o, String fName, List stats = new ArrayList(); InspectableObject packedRow; - while ((packedRow = ftOp.getNextRow()) != null) { + while ((packedRow = ftOp.fetchRow()) != null) { if (packedRow.oi.getCategory() != ObjectInspector.Category.STRUCT) { throw new HiveException("Unexpected object type encountered while unpacking row"); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 0ccab02..fe10005 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -77,7 +77,7 @@ /** * FetchTask implementation. **/ -public class FetchOperator implements Serializable { +public class FetchOperator implements RowFetcher, Serializable { static final Log LOG = LogFactory.getLog(FetchOperator.class.getName()); static final LogHelper console = new LogHelper(LOG); @@ -259,7 +259,7 @@ private boolean getNextPath() throws Exception { } FileSystem fs = currPath.getFileSystem(job); if (fs.exists(currPath)) { - for (FileStatus fStat : listStatusUnderPath(fs, currPath)) { + for (FileStatus fStat : listStatusUnderPath(fs, currPath, job)) { if (fStat.getLen() > 0) { return true; } @@ -420,6 +420,14 @@ public boolean pushRow() throws IOException, HiveException { return row != null; } + public ObjectInspector setupFetchContext() throws HiveException { + return getOutputObjectInspector(); + } + + public InspectableObject fetchRow() throws IOException { + return getNextRow(); + } + protected void pushRow(InspectableObject row) throws HiveException { operator.processOp(row.o, 0); } @@ -434,7 +442,7 @@ protected void flushRow() throws HiveException { * Get the next row. The fetch context is modified appropriately. * **/ - public InspectableObject getNextRow() throws IOException { + private InspectableObject getNextRow() throws IOException { try { while (true) { boolean opNotEOF = true; @@ -633,7 +641,7 @@ private boolean needConversion(TableDesc tableDesc, List partDesc * * @return list of file status entries */ - private FileStatus[] listStatusUnderPath(FileSystem fs, Path p) throws IOException { + static FileStatus[] listStatusUnderPath(FileSystem fs, Path p, JobConf job) throws IOException { boolean recursive = HiveConf.getBoolVar(job, HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE); // If this is in acid format always read it recursively regardless of what the jobconf says. if (!recursive && !AcidUtils.isAcid(p, job)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index c4f04cb..ddac920 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -36,7 +36,10 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; @@ -47,9 +50,8 @@ private static final long serialVersionUID = 1L; private int maxRows = 100; - private FetchOperator fetch; - private ListSinkOperator sink; private int totalRows; + private RowProcessor processor; private static transient final Log LOG = LogFactory.getLog(FetchTask.class); @@ -75,10 +77,7 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { // push down filters HiveInputFormat.pushFilters(job, ts); } - sink = work.getSink(); - fetch = new FetchOperator(work, job, source, getVirtualColumns(source)); - source.initialize(conf, new ObjectInspector[]{fetch.getOutputObjectInspector()}); - totalRows = 0; + processor = createProcessor(work, conf, job); } catch (Exception e) { // Bail out ungracefully - we should never hit @@ -88,6 +87,70 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { } } + private RowProcessor createProcessor(FetchWork work, HiveConf conf, JobConf job) + throws HiveException { + final Operator source = work.getSource(); + + RowFetcher fetcher; + if (!work.isMergeFetcher()) { + fetcher = new FetchOperator(work, job, source, getVirtualColumns(source)); + } else { + fetcher = new MergeSortingFetcher(work, job) { + private boolean initialized; + public int compare(List o1, List o2) { + return compareKeys(o1, o2); + } + @Override + public InspectableObject fetchRow() throws IOException, HiveException { + if (!initialized) { + // this is called in here because setupFetchContext() is called in Driver + // before executing plan + setupSegments(getPaths(fetchWork.getTblDir(), jobConf)); + initialized = true; + } + return super.fetchRow(); + } + @Override + public boolean pushRow() throws IOException, HiveException { + InspectableObject row = fetchRow(); + if (row != null) { + source.processOp(row.o, 0); + } else { + source.flush(); + } + return row != null; + } + }; + } + source.initialize(conf, new ObjectInspector[]{fetcher.setupFetchContext()}); + return new RowProcessor(fetcher, work.getSink()); + } + + private int compareKeys(List k1, List k2) { + int ret = k1.size() - k2.size(); + if (ret != 0) { + return ret; + } + for (int i = 0; i < k1.size(); i++) { + WritableComparable key_1 = (WritableComparable) k1.get(i); + WritableComparable key_2 = (WritableComparable) k2.get(i); + if (key_1 == null && key_2 == null) { + return -1; + } + if (key_1 == null) { + return -1; + } + if (key_2 == null) { + return 1; + } + ret = WritableComparator.get(key_1.getClass()).compare(key_1, key_2); + if(ret != 0) { + return ret; + } + } + return ret; + } + private List getVirtualColumns(Operator ts) { if (ts instanceof TableScanOperator && ts.getConf() != null) { return ((TableScanOperator)ts).getConf().getVirtualCols(); @@ -123,19 +186,19 @@ public void setMaxRows(int maxRows) { } public boolean fetch(List res) throws IOException, CommandNeedRetryException { - sink.reset(res); + processor.reset(res); int rowsRet = work.getLeastNumRows(); if (rowsRet <= 0) { rowsRet = work.getLimit() >= 0 ? Math.min(work.getLimit() - totalRows, maxRows) : maxRows; } try { if (rowsRet <= 0) { - fetch.clearFetchContext(); + processor.clearFetchContext(); return false; } boolean fetched = false; - while (sink.getNumRows() < rowsRet) { - if (!fetch.pushRow()) { + while (processor.getNumRows() < rowsRet) { + if (!processor.pushRow()) { if (work.getLeastNumRows() > 0) { throw new CommandNeedRetryException(); } @@ -151,7 +214,7 @@ public boolean fetch(List res) throws IOException, CommandNeedRetryException { } catch (Exception e) { throw new IOException(e); } finally { - totalRows += sink.getNumRows(); + totalRows += processor.getNumRows(); } } @@ -175,8 +238,35 @@ public String getName() { * @throws HiveException */ public void clearFetch() throws HiveException { - if (fetch != null) { - fetch.clearFetchContext(); + if (processor != null) { + processor.clearFetchContext(); + } + } + + private class RowProcessor { + + private final RowFetcher fetcher; + private final ListSinkOperator sink; + + public RowProcessor(RowFetcher fetcher, ListSinkOperator sink) { + this.fetcher = fetcher; + this.sink = sink; + } + + public void reset(List res) { + sink.reset(res); + } + + public boolean pushRow() throws IOException, HiveException { + return fetcher.pushRow(); + } + + public int getNumRows() { + return sink.getNumRows(); + } + + public void clearFetchContext() throws HiveException { + fetcher.clearFetchContext(); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MergeSortingFetcher.java ql/src/java/org/apache/hadoop/hive/ql/exec/MergeSortingFetcher.java new file mode 100644 index 0000000..6109e09 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MergeSortingFetcher.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ObjectPair; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.PriorityQueue; + +/** + * Merges partially sorted multiple streams into single one. + * Before fetching, setupSegments() should be called (can be reused by calling it again). + * + * setupFetchContext() makes OIs and evals for keys, which is needed for sorting. + * But for SMBJoin, it's already prepared in operator and uses it instead. + */ +public abstract class MergeSortingFetcher extends PriorityQueue + implements RowFetcher, Comparator> { + + protected final FetchWork fetchWork; + protected final JobConf jobConf; + + // for keeping track of the number of elements read. just for debugging + protected transient int counter; + + protected transient FetchOperator[] segments; + protected transient List keyFields; + protected transient List keyFieldOIs; + + // index of FetchOperator which is providing smallest one + protected transient Integer currentMinSegment; + protected transient ObjectPair, InspectableObject>[] keys; + + public MergeSortingFetcher(FetchWork fetchWork, JobConf jobConf) { + this.fetchWork = fetchWork; + this.jobConf = jobConf; + } + + public ObjectInspector setupFetchContext() throws HiveException { + TableDesc table = fetchWork.getTblDesc(); + ObjectInspector rowOI = getObjectInspector(table, jobConf); + List exprs = fetchWork.getMergeKeys(); + keyFieldOIs = new ArrayList(exprs.size()); + keyFields = new ArrayList(exprs.size()); + for (ExprNodeDesc expr : exprs) { + ExprNodeEvaluator evaluator = ExprNodeEvaluatorFactory.get(expr); + keyFieldOIs.add(evaluator.initialize(rowOI)); + keyFields.add(evaluator); + } + return rowOI; + } + + protected List getPaths(Path tblPath, JobConf conf) throws HiveException { + try { + FileSystem fs = tblPath.getFileSystem(conf); + List paths = new ArrayList(); + for (FileStatus status : FetchOperator.listStatusUnderPath(fs, tblPath, conf)) { + if (!status.isDir() && status.getLen() > 0) { + paths.add(status.getPath()); + } + } + return paths; + } catch (IOException e) { + throw new HiveException(e); + } + } + + protected ObjectInspector getObjectInspector(TableDesc table, JobConf conf) + throws HiveException { + try { + Deserializer serde = table.getDeserializerClass().newInstance(); + serde.initialize(conf, table.getProperties()); + return serde.getObjectInspector(); + } catch (Exception e) { + throw new HiveException(e); + } + } + + // Setup FetchOperators for reading. This should be called just before row fetching + protected void setupSegments(List paths) throws HiveException { + int segmentLen = paths.size(); + FetchOperator[] segments = segmentsForSize(segmentLen); + for (int i = 0; i < segmentLen; i++) { + Path path = paths.get(i); + if (segments[i] == null) { + segments[i] = new FetchOperator(fetchWork, new JobConf(jobConf)); + } + segments[i].setupContext(Arrays.asList(path)); + } + initialize(segmentLen); + for (int i = 0; i < segmentLen; i++) { + if (nextHive(i)) { + put(i); + } + } + counter = 0; + } + + @SuppressWarnings("unchecked") + private FetchOperator[] segmentsForSize(int segmentLen) { + if (segments == null || segments.length < segmentLen) { + FetchOperator[] newSegments = new FetchOperator[segmentLen]; + ObjectPair, InspectableObject>[] newKeys = new ObjectPair[segmentLen]; + if (segments != null) { + System.arraycopy(segments, 0, newSegments, 0, segments.length); + System.arraycopy(keys, 0, newKeys, 0, keys.length); + } + segments = newSegments; + keys = newKeys; + } + return segments; + } + + public void clearFetchContext() throws HiveException { + if (segments != null) { + for (FetchOperator op : segments) { + if (op != null) { + op.clearFetchContext(); + } + } + } + } + + protected final boolean lessThan(Object a, Object b) { + return compare(keys[(Integer) a].getFirst(), keys[(Integer) b].getFirst()) < 0; + } + + public InspectableObject fetchRow() throws IOException, HiveException { + if (currentMinSegment != null) { + adjustPriorityQueue(currentMinSegment); + } + Integer current = top(); + if (current == null) { + return null; + } + counter++; + return keys[currentMinSegment = current].getSecond(); + } + + private void adjustPriorityQueue(int current) throws IOException { + if (nextIO(current)) { + adjustTop(); // sort + } else { + pop(); + } + } + + // wrapping for exception handling + private boolean nextHive(int current) throws HiveException { + try { + return next(current); + } catch (IOException e) { + throw new HiveException(e); + } + } + + // wrapping for exception handling + private boolean nextIO(int current) throws IOException { + try { + return next(current); + } catch (HiveException e) { + throw new IOException(e); + } + } + + // return true if current min segment(FetchOperator) has next row + protected boolean next(int current) throws IOException, HiveException { + InspectableObject nextRow = readRow(current); + for (; nextRow != null; nextRow = readRow(current)) { + if (keys[current] == null) { + keys[current] = new ObjectPair, InspectableObject>(); + } + // It is possible that the row got absorbed in the operator tree. + if (nextRow.o != null) { + // todo this should be changed to be evaluated lazily, especially for single segment case + keys[current].setFirst(JoinUtil.computeKeys(nextRow.o, keyFields, keyFieldOIs)); + keys[current].setSecond(nextRow); + return true; + } + } + keys[current] = null; + return false; + } + + protected InspectableObject readRow(int current) throws IOException, HiveException { + return readFromSegment(current); + } + + protected final InspectableObject readFromSegment(int current) throws IOException { + return segments[current].fetchRow(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/RowFetcher.java ql/src/java/org/apache/hadoop/hive/ql/exec/RowFetcher.java new file mode 100644 index 0000000..a6b69f3 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/RowFetcher.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.IOException; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * Generic interface fro row fetching. + * Sources can be single or multiple stream(s). The former(FetchOperator) simply reads row + * from underlying InputFormat and the latter(MergeSortingFetcher) merges rows from + * partially sorted stream into single sorted stream by comparing key(s) provided. + * + * Currently, MergeSortingFetcher is used for SMBJoin or bucketed result fetching for order by. + */ +public interface RowFetcher { + + /** + * Setup context for fetching and return ObjectInspector for returning rows + */ + ObjectInspector setupFetchContext() throws HiveException; + + /** + * Fetch next row. Return null for EOF (no more row) + */ + InspectableObject fetchRow() throws IOException, HiveException; + + /** + * Fetch next row. Return null for EOF (no more row) + */ + boolean pushRow() throws IOException, HiveException; + + /** + * Clear context for fetching + */ + void clearFetchContext() throws HiveException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 2c9e81f..42be027 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -30,7 +29,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; import org.apache.hadoop.hive.ql.io.HiveInputFormat; @@ -48,7 +46,6 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.util.PriorityQueue; import org.apache.hadoop.util.ReflectionUtils; /** @@ -550,7 +547,7 @@ private void fetchOneRow(byte tag) { Operator forwardOp = conf.getAliasToSink().get(table).getChildOperators().get(0); try { - InspectableObject row = mergeQueue.getNextRow(); + InspectableObject row = mergeQueue.fetchRow(); if (row == null) { fetchDone[tag] = true; return; @@ -661,160 +658,82 @@ public void setConvertedAutomaticallySMBJoin(boolean convertedAutomaticallySMBJo // returns rows from possibly multiple bucket files of small table in ascending order // by utilizing primary queue (borrowed from hadoop) // elements of queue (Integer) are index to FetchOperator[] (segments) - private class MergeQueue extends PriorityQueue { + private class MergeQueue extends MergeSortingFetcher { - private final String alias; - private final FetchWork fetchWork; - private final JobConf jobConf; - - // for keeping track of the number of elements read. just for debugging - transient int counter; - - transient FetchOperator[] segments; - transient List keyFields; - transient List keyFieldOIs; - transient Operator forwardOp; - transient DummyStoreOperator sinkOp; - - // index of FetchOperator which is providing smallest one - transient Integer currentMinSegment; - transient ObjectPair, InspectableObject>[] keys; + final String alias; + final Operator forwardOp; + final DummyStoreOperator sinkOp; public MergeQueue(String alias, FetchWork fetchWork, JobConf jobConf, Operator forwardOp, DummyStoreOperator sinkOp) { + super(fetchWork, jobConf); this.alias = alias; - this.fetchWork = fetchWork; - this.jobConf = jobConf; this.forwardOp = forwardOp; this.sinkOp = sinkOp; } + public int compare(List o1, List o2) { + return compareKeys(o1, o2); + } + + @Override + public ObjectInspector setupFetchContext() throws HiveException { + // all needed metadata is ready in JoinOperator. just use that. + throw new HiveException("not for SMBJoin"); + } + // paths = bucket files of small table for current bucket file of big table // initializes a FetchOperator for each file in paths, reuses FetchOperator if possible // currently, number of paths is always the same (bucket numbers are all the same over // all partitions in a table). // But if hive supports assigning bucket number for each partition, this can be vary public void setupContext(List paths) throws HiveException { - int segmentLen = paths.size(); FetchOperator.setFetchOperatorContext(jobConf, fetchWork.getPartDir()); - FetchOperator[] segments = segmentsForSize(segmentLen); - for (int i = 0 ; i < segmentLen; i++) { - Path path = paths.get(i); - if (segments[i] == null) { - segments[i] = new FetchOperator(fetchWork, new JobConf(jobConf)); - } - segments[i].setupContext(Arrays.asList(path)); - } - initialize(segmentLen); - for (int i = 0; i < segmentLen; i++) { - if (nextHive(i)) { - put(i); - } - } - counter = 0; - } - - @SuppressWarnings("unchecked") - private FetchOperator[] segmentsForSize(int segmentLen) { - if (segments == null || segments.length < segmentLen) { - FetchOperator[] newSegments = new FetchOperator[segmentLen]; - ObjectPair, InspectableObject>[] newKeys = new ObjectPair[segmentLen]; - if (segments != null) { - System.arraycopy(segments, 0, newSegments, 0, segments.length); - System.arraycopy(keys, 0, newKeys, 0, keys.length); - } - segments = newSegments; - keys = newKeys; - } - return segments; - } - - public void clearFetchContext() throws HiveException { - if (segments != null) { - for (FetchOperator op : segments) { - if (op != null) { - op.clearFetchContext(); - } - } - } + setupSegments(paths); } @Override - protected boolean lessThan(Object a, Object b) { - return compareKeys(keys[(Integer) a].getFirst(), keys[(Integer)b].getFirst()) < 0; - } - - public final InspectableObject getNextRow() throws IOException { - if (currentMinSegment != null) { - adjustPriorityQueue(currentMinSegment); - } - Integer current = top(); + public InspectableObject fetchRow() throws IOException, HiveException { + InspectableObject current = super.fetchRow(); if (current == null) { LOG.info("MergeQueue forwarded " + counter + " rows"); return null; } - counter++; - return keys[currentMinSegment = current].getSecond(); - } - - private void adjustPriorityQueue(Integer current) throws IOException { - if (nextIO(current)) { - adjustTop(); // sort - } else { - pop(); - } + return current; } - // wrapping for exception handling - private boolean nextHive(Integer current) throws HiveException { - try { - return next(current); - } catch (IOException e) { - throw new HiveException(e); - } - } - - // wrapping for exception handling - private boolean nextIO(Integer current) throws IOException { - try { - return next(current); - } catch (HiveException e) { - throw new IOException(e); - } + @Override + public boolean pushRow() throws IOException, HiveException { + throw new HiveException("not for SMBJoin"); } // return true if current min segment(FetchOperator) has next row - private boolean next(Integer current) throws IOException, HiveException { + @Override + protected final boolean next(int current) throws IOException, HiveException { if (keyFields == null) { byte tag = tagForAlias(alias); // joinKeys/joinKeysOI are initialized after making merge queue, so setup lazily at runtime keyFields = joinKeys[tag]; keyFieldOIs = joinKeysObjectInspectors[tag]; } - InspectableObject nextRow = segments[current].getNextRow(); - while (nextRow != null) { - sinkOp.reset(); - if (keys[current] == null) { - keys[current] = new ObjectPair, InspectableObject>(); - } + return super.next(current); + } + @Override + protected InspectableObject readRow(int current) throws IOException, HiveException { + sinkOp.reset(); + InspectableObject nextRow = readFromSegment(current); + for (; nextRow != null; nextRow = readFromSegment(current)) { // Pass the row though the operator tree. It is guaranteed that not more than 1 row can // be produced from a input row. forwardOp.processOp(nextRow.o, 0); nextRow = sinkOp.getResult(); - - // It is possible that the row got absorbed in the operator tree. if (nextRow.o != null) { - // todo this should be changed to be evaluated lazily, especially for single segment case - keys[current].setFirst(JoinUtil.computeKeys(nextRow.o, keyFields, keyFieldOIs)); - keys[current].setSecond(nextRow); - return true; + return nextRow; } - nextRow = segments[current].getNextRow(); } - keys[current] = null; - return false; + return null; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 9f3df99..15b8151 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -398,7 +398,7 @@ private void startForward(boolean inputFileChangeSenstive, String bigTableBucket Operator forwardOp = work.getAliasToWork().get(alias); // walk through the operator tree while (!forwardOp.getDone()) { - InspectableObject row = fetchOp.getNextRow(); + InspectableObject row = fetchOp.fetchRow(); if (row == null) { break; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java index cf6941c..eb69ae8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; /** * Implementation of the query block. @@ -88,6 +89,11 @@ */ private QBSubQuery havingClauseSubQueryPredicate; + /** + * for parallel order by, result is bucketed on this exprs. fetch task merges on the run + */ + private List bucketKeys; + // results public void print(String msg) { @@ -387,4 +393,11 @@ public QBSubQuery getHavingClauseSubQueryPredicate() { return havingClauseSubQueryPredicate; } + public List getBucketKeys() { + return bucketKeys; + } + + public void setBucketKeys(List bucketKeys) { + this.bucketKeys = bucketKeys; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index c2d5c8c..d297323 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6916,17 +6916,23 @@ private Operator genReduceSinkPlan(String dest, QB qb, Operator input, sortExprs = qb.getParseInfo().getSortByForClause(dest); } + boolean useBucketingForOrderby = false; if (sortExprs == null) { sortExprs = qb.getParseInfo().getOrderByForClause(dest); if (sortExprs != null) { - assert numReducers == 1; - // in strict mode, in the presence of order by, limit must be specified - Integer limit = qb.getParseInfo().getDestLimit(dest); - if (conf.getVar(HiveConf.ConfVars.HIVEMAPREDMODE).equalsIgnoreCase( - "strict") + if (qb.getIsQuery() && !qb.getParseInfo().getIsSubQ() && !qb.isAnalyzeRewrite() + && conf.getIntVar(ConfVars.HIVEPARALLELORDERBYBUCKETINGNUM) != 0) { + numReducers = conf.getIntVar(ConfVars.HIVEPARALLELORDERBYBUCKETINGNUM); + useBucketingForOrderby = true; + } else { + assert numReducers == 1; + // in strict mode, in the presence of order by, limit must be specified + Integer limit = qb.getParseInfo().getDestLimit(dest); + if (conf.getVar(HiveConf.ConfVars.HIVEMAPREDMODE).equalsIgnoreCase("strict") && limit == null) { - throw new SemanticException(generateErrorMessage(sortExprs, + throw new SemanticException(generateErrorMessage(sortExprs, ErrorMsg.NO_LIMIT_WITH_ORDERBY.getMsg())); + } } } } @@ -6957,6 +6963,9 @@ private Operator genReduceSinkPlan(String dest, QB qb, Operator input, sortCols.add(exprNode); sortColsBack.add(ExprNodeDescUtils.backtrack(exprNode, dummy, input)); } + if (useBucketingForOrderby) { + qb.setBucketKeys(sortCols); + } } // For the generation of the values expression just get the inputs // signature and generate field expressions for those diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 23fbbe1..d005737 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -121,6 +121,7 @@ public void compile(final ParseContext pCtx, final List> rowsComputedFromStats; private transient StructObjectInspector statRowOI; + private List mergeKeys; + /** * Serialization Null Format for the serde used to fetch data. */ @@ -278,6 +280,23 @@ public SplitSample getSplitSample() { return splitSample; } + public boolean isMergeFetcher() { + return mergeKeys != null && !mergeKeys.isEmpty(); + } + + public List getMergeKeys() { + return mergeKeys; + } + + @Explain(displayName = "Merge Keys") + public String getMergeKeysDesc() { + return mergeKeys == null ? null : PlanUtils.getExprListString(mergeKeys); + } + + public void setMergeKeys(List mergeKeys) { + this.mergeKeys = mergeKeys; + } + @Override public String toString() { if (tblDir != null) { diff --git ql/src/test/queries/clientpositive/orderby_query_bucketing.q ql/src/test/queries/clientpositive/orderby_query_bucketing.q new file mode 100644 index 0000000..809ba63 --- /dev/null +++ ql/src/test/queries/clientpositive/orderby_query_bucketing.q @@ -0,0 +1,19 @@ +create table src_100 (key string, value string); +insert into table src_100 select * from src limit 100; + +set hive.parallel.orderby.bucketing.num = 3; + +explain extended select key,value from src_100 order by key; +select key,value from src_100 order by key; + +explain extended select sum(key) as sum, value from src_100 group by value order by sum; +select sum(key) as sum, value from src_100 group by value order by sum; + +-- negative, subquery +explain extended select sum(key), a.value from (select * from src_100 order by key) a group by a.value; + +-- negative, insert +CREATE TABLE insert_temp (key int, value string); + +EXPLAIN extended INSERT INTO TABLE insert_temp SELECT sum(key) as sum, value from src_100 group by value order by sum; + diff --git ql/src/test/results/clientpositive/orderby_query_bucketing.q.out ql/src/test/results/clientpositive/orderby_query_bucketing.q.out new file mode 100644 index 0000000..1f8eed3 --- /dev/null +++ ql/src/test/results/clientpositive/orderby_query_bucketing.q.out @@ -0,0 +1,1062 @@ +PREHOOK: query: create table src_100 (key string, value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@src_100 +POSTHOOK: query: create table src_100 (key string, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@src_100 +PREHOOK: query: insert into table src_100 select * from src limit 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src_100 +POSTHOOK: query: insert into table src_100 select * from src limit 100 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@src_100 +POSTHOOK: Lineage: src_100.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: src_100.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: explain extended select key,value from src_100 order by key +PREHOOK: type: QUERY +POSTHOOK: query: explain extended select key,value from src_100 order by key +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + +TOK_QUERY + TOK_FROM + TOK_TABREF + TOK_TABNAME + src_100 + TOK_INSERT + TOK_DESTINATION + TOK_DIR + TOK_TMP_FILE + TOK_SELECT + TOK_SELEXPR + TOK_TABLE_OR_COL + key + TOK_SELEXPR + TOK_TABLE_OR_COL + value + TOK_ORDERBY + TOK_TABSORTCOLNAMEASC + TOK_TABLE_OR_COL + key + + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src_100 + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + tag: -1 + value expressions: _col1 (type: string) + auto parallelism: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: src_100 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments + columns.types string:string +#### A masked pattern was here #### + name default.src_100 + numFiles 1 + numRows 100 + rawDataSize 1076 + serialization.ddl struct src_100 { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 1176 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments + columns.types string:string +#### A masked pattern was here #### + name default.src_100 + numFiles 1 + numRows 100 + rawDataSize 1076 + serialization.ddl struct src_100 { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 1176 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.src_100 + name: default.src_100 + Truncated Path -> Alias: + /src_100 [src_100] + Needs Tagging: false + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0,_col1 + columns.types string:string + escape.delim \ + hive.serialization.extend.nesting.levels true + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + Merge Keys: _col0 (type: string) + Processor Tree: + ListSink + +PREHOOK: query: select key,value from src_100 order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src_100 +#### A masked pattern was here #### +POSTHOOK: query: select key,value from src_100 order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src_100 +#### A masked pattern was here #### +0 val_0 +113 val_113 +128 val_128 +128 val_128 +129 val_129 +145 val_145 +146 val_146 +149 val_149 +15 val_15 +150 val_150 +152 val_152 +153 val_153 +155 val_155 +157 val_157 +162 val_162 +165 val_165 +166 val_166 +167 val_167 +17 val_17 +170 val_170 +174 val_174 +174 val_174 +193 val_193 +193 val_193 +195 val_195 +199 val_199 +20 val_20 +203 val_203 +205 val_205 +207 val_207 +208 val_208 +209 val_209 +213 val_213 +219 val_219 +221 val_221 +224 val_224 +237 val_237 +238 val_238 +247 val_247 +252 val_252 +255 val_255 +265 val_265 +266 val_266 +27 val_27 +273 val_273 +277 val_277 +278 val_278 +281 val_281 +287 val_287 +292 val_292 +302 val_302 +309 val_309 +311 val_311 +311 val_311 +316 val_316 +325 val_325 +327 val_327 +338 val_338 +339 val_339 +342 val_342 +345 val_345 +365 val_365 +367 val_367 +369 val_369 +37 val_37 +374 val_374 +377 val_377 +378 val_378 +394 val_394 +396 val_396 +397 val_397 +399 val_399 +401 val_401 +403 val_403 +406 val_406 +409 val_409 +413 val_413 +417 val_417 +417 val_417 +429 val_429 +430 val_430 +438 val_438 +439 val_439 +446 val_446 +455 val_455 +459 val_459 +466 val_466 +469 val_469 +475 val_475 +482 val_482 +484 val_484 +489 val_489 +489 val_489 +494 val_494 +495 val_495 +57 val_57 +66 val_66 +82 val_82 +86 val_86 +98 val_98 +PREHOOK: query: explain extended select sum(key) as sum, value from src_100 group by value order by sum +PREHOOK: type: QUERY +POSTHOOK: query: explain extended select sum(key) as sum, value from src_100 group by value order by sum +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + +TOK_QUERY + TOK_FROM + TOK_TABREF + TOK_TABNAME + src_100 + TOK_INSERT + TOK_DESTINATION + TOK_DIR + TOK_TMP_FILE + TOK_SELECT + TOK_SELEXPR + TOK_FUNCTION + sum + TOK_TABLE_OR_COL + key + sum + TOK_SELEXPR + TOK_TABLE_OR_COL + value + TOK_GROUPBY + TOK_TABLE_OR_COL + value + TOK_ORDERBY + TOK_TABSORTCOLNAMEASC + TOK_TABLE_OR_COL + sum + + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src_100 + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Select Operator + expressions: value (type: string), key (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: sum(_col1) + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + tag: -1 + value expressions: _col1 (type: double) + auto parallelism: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: src_100 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments + columns.types string:string +#### A masked pattern was here #### + name default.src_100 + numFiles 1 + numRows 100 + rawDataSize 1076 + serialization.ddl struct src_100 { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 1176 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments + columns.types string:string +#### A masked pattern was here #### + name default.src_100 + numFiles 1 + numRows 100 + rawDataSize 1076 + serialization.ddl struct src_100 { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 1176 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.src_100 + name: default.src_100 + Truncated Path -> Alias: + /src_100 [$hdt$_0:$hdt$_0:src_100] + Needs Tagging: false + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 50 Data size: 538 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col1 (type: double), _col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 50 Data size: 538 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types double,string + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + GatherStats: false + Reduce Output Operator + key expressions: _col0 (type: double) + sort order: + + Statistics: Num rows: 50 Data size: 538 Basic stats: COMPLETE Column stats: NONE + tag: -1 + value expressions: _col1 (type: string) + auto parallelism: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: -mr-10003 + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types double,string + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types double,string + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Truncated Path -> Alias: +#### A masked pattern was here #### + Needs Tagging: false + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: double), VALUE._col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 50 Data size: 538 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + Statistics: Num rows: 50 Data size: 538 Basic stats: COMPLETE Column stats: NONE +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0,_col1 + columns.types double:string + escape.delim \ + hive.serialization.extend.nesting.levels true + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + Merge Keys: _col0 (type: double) + Processor Tree: + ListSink + +PREHOOK: query: select sum(key) as sum, value from src_100 group by value order by sum +PREHOOK: type: QUERY +PREHOOK: Input: default@src_100 +#### A masked pattern was here #### +POSTHOOK: query: select sum(key) as sum, value from src_100 group by value order by sum +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src_100 +#### A masked pattern was here #### +0.0 val_0 +15.0 val_15 +17.0 val_17 +20.0 val_20 +27.0 val_27 +37.0 val_37 +57.0 val_57 +66.0 val_66 +82.0 val_82 +86.0 val_86 +98.0 val_98 +113.0 val_113 +129.0 val_129 +145.0 val_145 +146.0 val_146 +149.0 val_149 +150.0 val_150 +152.0 val_152 +153.0 val_153 +155.0 val_155 +157.0 val_157 +162.0 val_162 +165.0 val_165 +166.0 val_166 +167.0 val_167 +170.0 val_170 +195.0 val_195 +199.0 val_199 +203.0 val_203 +205.0 val_205 +207.0 val_207 +208.0 val_208 +209.0 val_209 +213.0 val_213 +219.0 val_219 +221.0 val_221 +224.0 val_224 +237.0 val_237 +238.0 val_238 +247.0 val_247 +252.0 val_252 +255.0 val_255 +256.0 val_128 +265.0 val_265 +266.0 val_266 +273.0 val_273 +277.0 val_277 +278.0 val_278 +281.0 val_281 +287.0 val_287 +292.0 val_292 +302.0 val_302 +309.0 val_309 +316.0 val_316 +325.0 val_325 +327.0 val_327 +338.0 val_338 +339.0 val_339 +342.0 val_342 +345.0 val_345 +348.0 val_174 +365.0 val_365 +367.0 val_367 +369.0 val_369 +374.0 val_374 +377.0 val_377 +378.0 val_378 +386.0 val_193 +394.0 val_394 +396.0 val_396 +397.0 val_397 +399.0 val_399 +401.0 val_401 +403.0 val_403 +406.0 val_406 +409.0 val_409 +413.0 val_413 +429.0 val_429 +430.0 val_430 +438.0 val_438 +439.0 val_439 +446.0 val_446 +455.0 val_455 +459.0 val_459 +466.0 val_466 +469.0 val_469 +475.0 val_475 +482.0 val_482 +484.0 val_484 +494.0 val_494 +495.0 val_495 +622.0 val_311 +834.0 val_417 +978.0 val_489 +PREHOOK: query: -- negative, subquery +explain extended select sum(key), a.value from (select * from src_100 order by key) a group by a.value +PREHOOK: type: QUERY +POSTHOOK: query: -- negative, subquery +explain extended select sum(key), a.value from (select * from src_100 order by key) a group by a.value +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + +TOK_QUERY + TOK_FROM + TOK_SUBQUERY + TOK_QUERY + TOK_FROM + TOK_TABREF + TOK_TABNAME + src_100 + TOK_INSERT + TOK_DESTINATION + TOK_DIR + TOK_TMP_FILE + TOK_SELECT + TOK_SELEXPR + TOK_ALLCOLREF + TOK_ORDERBY + TOK_TABSORTCOLNAMEASC + TOK_TABLE_OR_COL + key + a + TOK_INSERT + TOK_DESTINATION + TOK_DIR + TOK_TMP_FILE + TOK_SELECT + TOK_SELEXPR + TOK_FUNCTION + sum + TOK_TABLE_OR_COL + key + TOK_SELEXPR + . + TOK_TABLE_OR_COL + a + value + TOK_GROUPBY + . + TOK_TABLE_OR_COL + a + value + + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src_100 + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + tag: -1 + value expressions: _col1 (type: string) + auto parallelism: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: src_100 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments + columns.types string:string +#### A masked pattern was here #### + name default.src_100 + numFiles 1 + numRows 100 + rawDataSize 1076 + serialization.ddl struct src_100 { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 1176 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments + columns.types string:string +#### A masked pattern was here #### + name default.src_100 + numFiles 1 + numRows 100 + rawDataSize 1076 + serialization.ddl struct src_100 { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 1176 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.src_100 + name: default.src_100 + Truncated Path -> Alias: + /src_100 [$hdt$_0:$hdt$_0:src_100] + Needs Tagging: false + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: sum(_col1) + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types string,double + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + GatherStats: false + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + tag: -1 + value expressions: _col1 (type: double) + auto parallelism: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: -mr-10003 + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types string,double + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types string,double + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Truncated Path -> Alias: +#### A masked pattern was here #### + Needs Tagging: false + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 50 Data size: 538 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col1 (type: double), _col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 50 Data size: 538 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + Statistics: Num rows: 50 Data size: 538 Basic stats: COMPLETE Column stats: NONE +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0,_col1 + columns.types double:string + escape.delim \ + hive.serialization.extend.nesting.levels true + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: -- negative, insert +CREATE TABLE insert_temp (key int, value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@insert_temp +POSTHOOK: query: -- negative, insert +CREATE TABLE insert_temp (key int, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@insert_temp +PREHOOK: query: EXPLAIN extended INSERT INTO TABLE insert_temp SELECT sum(key) as sum, value from src_100 group by value order by sum +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN extended INSERT INTO TABLE insert_temp SELECT sum(key) as sum, value from src_100 group by value order by sum +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + +TOK_QUERY + TOK_FROM + TOK_TABREF + TOK_TABNAME + src_100 + TOK_INSERT + TOK_INSERT_INTO + TOK_TAB + TOK_TABNAME + insert_temp + TOK_SELECT + TOK_SELEXPR + TOK_FUNCTION + sum + TOK_TABLE_OR_COL + key + sum + TOK_SELEXPR + TOK_TABLE_OR_COL + value + TOK_GROUPBY + TOK_TABLE_OR_COL + value + TOK_ORDERBY + TOK_TABSORTCOLNAMEASC + TOK_TABLE_OR_COL + sum + + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src_100 + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Select Operator + expressions: value (type: string), key (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: sum(_col1) + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 100 Data size: 1076 Basic stats: COMPLETE Column stats: NONE + tag: -1 + value expressions: _col1 (type: double) + auto parallelism: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: src_100 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments + columns.types string:string +#### A masked pattern was here #### + name default.src_100 + numFiles 1 + numRows 100 + rawDataSize 1076 + serialization.ddl struct src_100 { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 1176 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments + columns.types string:string +#### A masked pattern was here #### + name default.src_100 + numFiles 1 + numRows 100 + rawDataSize 1076 + serialization.ddl struct src_100 { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 1176 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.src_100 + name: default.src_100 + Truncated Path -> Alias: + /src_100 [$hdt$_0:$hdt$_0:src_100] + Needs Tagging: false + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 50 Data size: 538 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col1 (type: double), _col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 50 Data size: 538 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types double,string + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + GatherStats: false + Reduce Output Operator + key expressions: _col0 (type: double) + sort order: + + Statistics: Num rows: 50 Data size: 538 Basic stats: COMPLETE Column stats: NONE + tag: -1 + value expressions: _col1 (type: string) + auto parallelism: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: -mr-10001 + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types double,string + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types double,string + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Truncated Path -> Alias: +#### A masked pattern was here #### + Needs Tagging: false + Reduce Operator Tree: + Select Operator + expressions: UDFToInteger(KEY.reducesinkkey0) (type: int), VALUE._col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 50 Data size: 538 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 1 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + Statistics: Num rows: 50 Data size: 538 Basic stats: COMPLETE Column stats: NONE +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,value + columns.comments + columns.types int:string +#### A masked pattern was here #### + name default.insert_temp + serialization.ddl struct insert_temp { i32 key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.insert_temp + TotalFiles: 1 + GatherStats: true + MultiFileSpray: false + + Stage: Stage-0 + Move Operator + tables: + replace: false +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,value + columns.comments + columns.types int:string +#### A masked pattern was here #### + name default.insert_temp + serialization.ddl struct insert_temp { i32 key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.insert_temp + + Stage: Stage-3 + Stats-Aggr Operator +#### A masked pattern was here #### +