diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4e5df4c..292b1f0 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -766,6 +766,9 @@ HIVEOUTERJOINSUPPORTSFILTERS("hive.outerjoin.supports.filters", true), + // 0 for disable + HIVEPARALLELORDERBYBUCKETINGNUM("hive.parallel.orderby.bucketing.num", 0), + // 'minimal', 'more' (and 'all' later) HIVEFETCHTASKCONVERSION("hive.fetch.task.conversion", "minimal", new StringsValidator("minimal", "more")), diff --git conf/hive-default.xml.template conf/hive-default.xml.template index 7b53239..9f6b30f 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -1989,6 +1989,18 @@ + hive.parallel.orderby.bucketing.num + 0 + + Generally in hive, order by clause is executed by single reducer. But if the query is just for patching + not for inserting into table or partition, fully ordered stream could be provided in fetch task by + merging partially sorted streams from multiple reducers. + This configuration means the number of reducers for the last MapReduce task for order by. + -1 means it will be decided by usual calculation(based on hive.exec.reducers.bytes.per.reducer). 0 disables this. + + + + hive.fetch.task.conversion minimal diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 6daf199..f6b0491 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -34,10 +34,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; -import org.apache.hadoop.hive.ql.exec.FooterBuffer; import org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveRecordReader; @@ -48,7 +46,6 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.DelegatedObjectInspectorFactory; @@ -59,11 +56,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveTypeEntry; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; @@ -76,7 +70,7 @@ /** * FetchTask implementation. **/ -public class FetchOperator implements Serializable { +public class FetchOperator implements RowFetcher, Serializable { static Log LOG = LogFactory.getLog(FetchOperator.class.getName()); static LogHelper console = new LogHelper(LOG); @@ -330,7 +324,7 @@ private void getNextPath() throws Exception { if (isNativeTable) { FileSystem fs = currPath.getFileSystem(job); if (fs.exists(currPath)) { - FileStatus[] fStats = listStatusUnderPath(fs, currPath); + FileStatus[] fStats = listStatusUnderPath(fs, currPath, job); for (FileStatus fStat : fStats) { if (fStat.getLen() > 0) { tblDataDone = true; @@ -365,7 +359,7 @@ private void getNextPath() throws Exception { } FileSystem fs = nxt.getFileSystem(job); if (fs.exists(nxt)) { - FileStatus[] fStats = listStatusUnderPath(fs, nxt); + FileStatus[] fStats = listStatusUnderPath(fs, nxt, job); for (FileStatus fStat : fStats) { if (fStat.getLen() > 0) { currPath = nxt; @@ -540,6 +534,14 @@ public boolean pushRow() throws IOException, HiveException { return row != null; } + public ObjectInspector setupFetchContext() throws HiveException { + return getOutputObjectInspector(); + } + + public InspectableObject fetchRow() throws IOException { + return getNextRow(); + } + protected void pushRow(InspectableObject row) throws HiveException { operator.processOp(row.o, 0); } @@ -742,8 +744,10 @@ public ObjectInspector getOutputObjectInspector() throws HiveException { * * @return list of file status entries */ - private FileStatus[] listStatusUnderPath(FileSystem fs, Path p) throws IOException { - boolean recursive = HiveConf.getBoolVar(job, HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE); + public static FileStatus[] listStatusUnderPath(FileSystem fs, Path p, JobConf job) + throws IOException { + HiveConf hiveConf = new HiveConf(job, FetchOperator.class); + boolean recursive = hiveConf.getBoolVar(HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE); if (!recursive) { return fs.listStatus(p); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 770d904..6bbcd84 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -35,7 +35,10 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; @@ -46,9 +49,8 @@ private static final long serialVersionUID = 1L; private int maxRows = 100; - private FetchOperator fetch; - private ListSinkOperator sink; private int totalRows; + private RowProcessor processor; private static transient final Log LOG = LogFactory.getLog(FetchTask.class); @@ -74,10 +76,7 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { // push down filters HiveInputFormat.pushFilters(job, ts); } - sink = work.getSink(); - fetch = new FetchOperator(work, job, source, getVirtualColumns(source)); - source.initialize(conf, new ObjectInspector[]{fetch.getOutputObjectInspector()}); - totalRows = 0; + processor = createProcessor(work, conf, job); } catch (Exception e) { // Bail out ungracefully - we should never hit @@ -87,6 +86,70 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { } } + private RowProcessor createProcessor(FetchWork work, HiveConf conf, JobConf job) + throws HiveException { + final Operator source = work.getSource(); + + RowFetcher fetcher; + if (!work.isMergeFetcher()) { + fetcher = new FetchOperator(work, job, source, getVirtualColumns(source)); + } else { + fetcher = new MergeSortingFetcher(work, job) { + private boolean initialized; + public int compare(List o1, List o2) { + return compareKeys(o1, o2); + } + @Override + public InspectableObject fetchRow() throws IOException, HiveException { + if (!initialized) { + // this is called in here because setupFetchContext() is called in Driver + // before executing plan + setupSegments(getPaths(fetchWork.getTblDir(), jobConf)); + initialized = true; + } + return super.fetchRow(); + } + @Override + public boolean pushRow() throws IOException, HiveException { + InspectableObject row = fetchRow(); + if (row != null) { + source.processOp(row.o, 0); + } else { + source.flush(); + } + return row != null; + } + }; + } + source.initialize(conf, new ObjectInspector[]{fetcher.setupFetchContext()}); + return new RowProcessor(fetcher, work.getSink()); + } + + private int compareKeys(List k1, List k2) { + int ret = k1.size() - k2.size(); + if (ret != 0) { + return ret; + } + for (int i = 0; i < k1.size(); i++) { + WritableComparable key_1 = (WritableComparable) k1.get(i); + WritableComparable key_2 = (WritableComparable) k2.get(i); + if (key_1 == null && key_2 == null) { + return -1; + } + if (key_1 == null) { + return -1; + } + if (key_2 == null) { + return 1; + } + ret = WritableComparator.get(key_1.getClass()).compare(key_1, key_2); + if(ret != 0) { + return ret; + } + } + return ret; + } + private List getVirtualColumns(Operator ts) { if (ts instanceof TableScanOperator && ts.getConf() != null) { return ((TableScanOperator)ts).getConf().getVirtualCols(); @@ -122,19 +185,19 @@ public void setMaxRows(int maxRows) { } public boolean fetch(List res) throws IOException, CommandNeedRetryException { - sink.reset(res); + processor.reset(res); int rowsRet = work.getLeastNumRows(); if (rowsRet <= 0) { rowsRet = work.getLimit() >= 0 ? Math.min(work.getLimit() - totalRows, maxRows) : maxRows; } try { if (rowsRet <= 0) { - fetch.clearFetchContext(); + processor.clearFetchContext(); return false; } boolean fetched = false; - while (sink.getNumRows() < rowsRet) { - if (!fetch.pushRow()) { + while (processor.getNumRows() < rowsRet) { + if (!processor.pushRow()) { if (work.getLeastNumRows() > 0) { throw new CommandNeedRetryException(); } @@ -150,7 +213,7 @@ public boolean fetch(List res) throws IOException, CommandNeedRetryException { } catch (Exception e) { throw new IOException(e); } finally { - totalRows += sink.getNumRows(); + totalRows += processor.getNumRows(); } } @@ -170,8 +233,35 @@ public String getName() { * @throws HiveException */ public void clearFetch() throws HiveException { - if (fetch != null) { - fetch.clearFetchContext(); + if (processor != null) { + processor.clearFetchContext(); + } + } + + private class RowProcessor { + + private final RowFetcher fetcher; + private final ListSinkOperator sink; + + public RowProcessor(RowFetcher fetcher, ListSinkOperator sink) { + this.fetcher = fetcher; + this.sink = sink; + } + + public void reset(List res) { + sink.reset(res); + } + + public boolean pushRow() throws IOException, HiveException { + return fetcher.pushRow(); + } + + public int getNumRows() { + return sink.getNumRows(); + } + + public void clearFetchContext() throws HiveException { + fetcher.clearFetchContext(); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MergeSortingFetcher.java ql/src/java/org/apache/hadoop/hive/ql/exec/MergeSortingFetcher.java new file mode 100644 index 0000000..8df1fb6 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MergeSortingFetcher.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ObjectPair; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.PriorityQueue; + +/** + * Merges partially sorted multiple streams into single one. + * Before fetching, setupSegments() should be called (can be reused by calling it again). + * + * setupFetchContext() makes OIs and evals for keys, which is needed for sorting. + * But for SMBJoin, it's already prepared in operator and uses it instead. + */ +public abstract class MergeSortingFetcher extends PriorityQueue + implements RowFetcher, Comparator> { + + protected final FetchWork fetchWork; + protected final JobConf jobConf; + + // for keeping track of the number of elements read. just for debugging + protected transient int counter; + + protected transient FetchOperator[] segments; + protected transient List keyFields; + protected transient List keyFieldOIs; + + // index of FetchOperator which is providing smallest one + protected transient Integer currentMinSegment; + protected transient ObjectPair, InspectableObject>[] keys; + + public MergeSortingFetcher(FetchWork fetchWork, JobConf jobConf) { + this.fetchWork = fetchWork; + this.jobConf = jobConf; + } + + public ObjectInspector setupFetchContext() throws HiveException { + TableDesc table = fetchWork.getTblDesc(); + ObjectInspector rowOI = getObjectInspector(table, jobConf); + List exprs = fetchWork.getMergeKeys(); + keyFieldOIs = new ArrayList(exprs.size()); + keyFields = new ArrayList(exprs.size()); + for (ExprNodeDesc expr : exprs) { + ExprNodeEvaluator evaluator = ExprNodeEvaluatorFactory.get(expr); + keyFieldOIs.add(evaluator.initialize(rowOI)); + keyFields.add(evaluator); + } + return rowOI; + } + + protected List getPaths(Path tblPath, JobConf conf) throws HiveException { + try { + FileSystem fs = tblPath.getFileSystem(conf); + List paths = new ArrayList(); + for (FileStatus status : FetchOperator.listStatusUnderPath(fs, tblPath, conf)) { + if (!status.isDir() && status.getLen() > 0) { + paths.add(status.getPath()); + } + } + return paths; + } catch (IOException e) { + throw new HiveException(e); + } + } + + protected ObjectInspector getObjectInspector(TableDesc table, JobConf conf) + throws HiveException { + try { + Deserializer serde = table.getDeserializerClass().newInstance(); + serde.initialize(conf, table.getProperties()); + return serde.getObjectInspector(); + } catch (Exception e) { + throw new HiveException(e); + } + } + + // Setup FetchOperators for reading. This should be called just before row fetching + protected void setupSegments(List paths) throws HiveException { + int segmentLen = paths.size(); + FetchOperator[] segments = segmentsForSize(segmentLen); + for (int i = 0; i < segmentLen; i++) { + Path path = paths.get(i); + if (segments[i] == null) { + segments[i] = new FetchOperator(fetchWork, new JobConf(jobConf)); + } + segments[i].setupContext(Arrays.asList(path)); + } + initialize(segmentLen); + for (int i = 0; i < segmentLen; i++) { + if (nextHive(i)) { + put(i); + } + } + counter = 0; + } + + @SuppressWarnings("unchecked") + private FetchOperator[] segmentsForSize(int segmentLen) { + if (segments == null || segments.length < segmentLen) { + FetchOperator[] newSegments = new FetchOperator[segmentLen]; + ObjectPair, InspectableObject>[] newKeys = new ObjectPair[segmentLen]; + if (segments != null) { + System.arraycopy(segments, 0, newSegments, 0, segments.length); + System.arraycopy(keys, 0, newKeys, 0, keys.length); + } + segments = newSegments; + keys = newKeys; + } + return segments; + } + + public void clearFetchContext() throws HiveException { + if (segments != null) { + for (FetchOperator op : segments) { + if (op != null) { + op.clearFetchContext(); + } + } + } + } + + protected final boolean lessThan(Object a, Object b) { + return compare(keys[(Integer) a].getFirst(), keys[(Integer) b].getFirst()) < 0; + } + + public InspectableObject fetchRow() throws IOException, HiveException { + if (currentMinSegment != null) { + adjustPriorityQueue(currentMinSegment); + } + Integer current = top(); + if (current == null) { + return null; + } + counter++; + return keys[currentMinSegment = current].getSecond(); + } + + private void adjustPriorityQueue(int current) throws IOException { + if (nextIO(current)) { + adjustTop(); // sort + } else { + pop(); + } + } + + // wrapping for exception handling + private boolean nextHive(int current) throws HiveException { + try { + return next(current); + } catch (IOException e) { + throw new HiveException(e); + } + } + + // wrapping for exception handling + private boolean nextIO(int current) throws IOException { + try { + return next(current); + } catch (HiveException e) { + throw new IOException(e); + } + } + + // return true if current min segment(FetchOperator) has next row + protected boolean next(int current) throws IOException, HiveException { + InspectableObject nextRow = readRow(current); + for (; nextRow != null; nextRow = readRow(current)) { + if (keys[current] == null) { + keys[current] = new ObjectPair, InspectableObject>(); + } + // It is possible that the row got absorbed in the operator tree. + if (nextRow.o != null) { + // todo this should be changed to be evaluated lazily, especially for single segment case + keys[current].setFirst(JoinUtil.computeKeys(nextRow.o, keyFields, keyFieldOIs)); + keys[current].setSecond(nextRow); + return true; + } + } + keys[current] = null; + return false; + } + + protected InspectableObject readRow(int current) throws IOException, HiveException { + return readFromSegment(current); + } + + protected final InspectableObject readFromSegment(int current) throws IOException { + return segments[current].getNextRow(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/RowFetcher.java ql/src/java/org/apache/hadoop/hive/ql/exec/RowFetcher.java new file mode 100644 index 0000000..a6b69f3 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/RowFetcher.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.IOException; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * Generic interface fro row fetching. + * Sources can be single or multiple stream(s). The former(FetchOperator) simply reads row + * from underlying InputFormat and the latter(MergeSortingFetcher) merges rows from + * partially sorted stream into single sorted stream by comparing key(s) provided. + * + * Currently, MergeSortingFetcher is used for SMBJoin or bucketed result fetching for order by. + */ +public interface RowFetcher { + + /** + * Setup context for fetching and return ObjectInspector for returning rows + */ + ObjectInspector setupFetchContext() throws HiveException; + + /** + * Fetch next row. Return null for EOF (no more row) + */ + InspectableObject fetchRow() throws IOException, HiveException; + + /** + * Fetch next row. Return null for EOF (no more row) + */ + boolean pushRow() throws IOException, HiveException; + + /** + * Clear context for fetching + */ + void clearFetchContext() throws HiveException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 487bb33..91931d0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -30,7 +29,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; import org.apache.hadoop.hive.ql.io.HiveInputFormat; @@ -48,7 +46,6 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.util.PriorityQueue; import org.apache.hadoop.util.ReflectionUtils; /** @@ -550,7 +547,7 @@ private void fetchOneRow(byte tag) { Operator forwardOp = conf.getAliasToSink().get(table).getChildOperators().get(0); try { - InspectableObject row = mergeQueue.getNextRow(); + InspectableObject row = mergeQueue.fetchRow(); if (row == null) { fetchDone[tag] = true; return; @@ -661,159 +658,81 @@ public void setConvertedAutomaticallySMBJoin(boolean convertedAutomaticallySMBJo // returns rows from possibly multiple bucket files of small table in ascending order // by utilizing primary queue (borrowed from hadoop) // elements of queue (Integer) are index to FetchOperator[] (segments) - private class MergeQueue extends PriorityQueue { + private class MergeQueue extends MergeSortingFetcher { - private final String alias; - private final FetchWork fetchWork; - private final JobConf jobConf; - - // for keeping track of the number of elements read. just for debugging - transient int counter; - - transient FetchOperator[] segments; - transient List keyFields; - transient List keyFieldOIs; - transient Operator forwardOp; - transient DummyStoreOperator sinkOp; - - // index of FetchOperator which is providing smallest one - transient Integer currentMinSegment; - transient ObjectPair, InspectableObject>[] keys; + final String alias; + final Operator forwardOp; + final DummyStoreOperator sinkOp; public MergeQueue(String alias, FetchWork fetchWork, JobConf jobConf, Operator forwardOp, DummyStoreOperator sinkOp) { + super(fetchWork, jobConf); this.alias = alias; - this.fetchWork = fetchWork; - this.jobConf = jobConf; this.forwardOp = forwardOp; this.sinkOp = sinkOp; } + public int compare(List o1, List o2) { + return compareKeys(o1, o2); + } + + @Override + public ObjectInspector setupFetchContext() throws HiveException { + // all needed metadata is ready in JoinOperator. just use that. + throw new HiveException("not for SMBJoin"); + } + // paths = bucket files of small table for current bucket file of big table // initializes a FetchOperator for each file in paths, reuses FetchOperator if possible // currently, number of paths is always the same (bucket numbers are all the same over // all partitions in a table). // But if hive supports assigning bucket number for each partition, this can be vary public void setupContext(List paths) throws HiveException { - int segmentLen = paths.size(); - FetchOperator[] segments = segmentsForSize(segmentLen); - for (int i = 0 ; i < segmentLen; i++) { - Path path = paths.get(i); - if (segments[i] == null) { - segments[i] = new FetchOperator(fetchWork, new JobConf(jobConf)); - } - segments[i].setupContext(Arrays.asList(path)); - } - initialize(segmentLen); - for (int i = 0; i < segmentLen; i++) { - if (nextHive(i)) { - put(i); - } - } - counter = 0; - } - - @SuppressWarnings("unchecked") - private FetchOperator[] segmentsForSize(int segmentLen) { - if (segments == null || segments.length < segmentLen) { - FetchOperator[] newSegments = new FetchOperator[segmentLen]; - ObjectPair, InspectableObject>[] newKeys = new ObjectPair[segmentLen]; - if (segments != null) { - System.arraycopy(segments, 0, newSegments, 0, segments.length); - System.arraycopy(keys, 0, newKeys, 0, keys.length); - } - segments = newSegments; - keys = newKeys; - } - return segments; - } - - public void clearFetchContext() throws HiveException { - if (segments != null) { - for (FetchOperator op : segments) { - if (op != null) { - op.clearFetchContext(); - } - } - } + setupSegments(paths); } @Override - protected boolean lessThan(Object a, Object b) { - return compareKeys(keys[(Integer) a].getFirst(), keys[(Integer)b].getFirst()) < 0; - } - - public final InspectableObject getNextRow() throws IOException { - if (currentMinSegment != null) { - adjustPriorityQueue(currentMinSegment); - } - Integer current = top(); + public InspectableObject fetchRow() throws IOException, HiveException { + InspectableObject current = super.fetchRow(); if (current == null) { LOG.info("MergeQueue forwarded " + counter + " rows"); return null; } - counter++; - return keys[currentMinSegment = current].getSecond(); - } - - private void adjustPriorityQueue(Integer current) throws IOException { - if (nextIO(current)) { - adjustTop(); // sort - } else { - pop(); - } + return current; } - // wrapping for exception handling - private boolean nextHive(Integer current) throws HiveException { - try { - return next(current); - } catch (IOException e) { - throw new HiveException(e); - } - } - - // wrapping for exception handling - private boolean nextIO(Integer current) throws IOException { - try { - return next(current); - } catch (HiveException e) { - throw new IOException(e); - } + @Override + public boolean pushRow() throws IOException, HiveException { + throw new HiveException("not for SMBJoin"); } // return true if current min segment(FetchOperator) has next row - private boolean next(Integer current) throws IOException, HiveException { + @Override + protected final boolean next(int current) throws IOException, HiveException { if (keyFields == null) { byte tag = tagForAlias(alias); // joinKeys/joinKeysOI are initialized after making merge queue, so setup lazily at runtime keyFields = joinKeys[tag]; keyFieldOIs = joinKeysObjectInspectors[tag]; } - InspectableObject nextRow = segments[current].getNextRow(); - while (nextRow != null) { - sinkOp.reset(); - if (keys[current] == null) { - keys[current] = new ObjectPair, InspectableObject>(); - } + return super.next(current); + } + @Override + protected InspectableObject readRow(int current) throws IOException, HiveException { + sinkOp.reset(); + InspectableObject nextRow = readFromSegment(current); + for (; nextRow != null; nextRow = readFromSegment(current)) { // Pass the row though the operator tree. It is guaranteed that not more than 1 row can // be produced from a input row. forwardOp.processOp(nextRow.o, 0); nextRow = sinkOp.getResult(); - - // It is possible that the row got absorbed in the operator tree. if (nextRow.o != null) { - // todo this should be changed to be evaluated lazily, especially for single segment case - keys[current].setFirst(JoinUtil.computeKeys(nextRow.o, keyFields, keyFieldOIs)); - keys[current].setSecond(nextRow); - return true; + return nextRow; } - nextRow = segments[current].getNextRow(); } - keys[current] = null; - return false; + return null; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java index a8b436e..8b0a389 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; /** * Implementation of the query block. @@ -76,7 +77,11 @@ * clause. */ private int numSubQueryPredicates; - + + /** + * for parallel order by, result is bucketed on this exprs. fetch task merges on the run + */ + private List bucketKeys; // results @@ -341,4 +346,11 @@ protected int incrNumSubQueryPredicates() { return ++numSubQueryPredicates; } + public List getBucketKeys() { + return bucketKeys; + } + + public void setBucketKeys(List bucketKeys) { + this.bucketKeys = bucketKeys; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 3b33dc2..cc24808 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6404,17 +6404,23 @@ private Operator genReduceSinkPlan(String dest, QB qb, Operator input, sortExprs = qb.getParseInfo().getSortByForClause(dest); } + boolean useBucketingForOrderby = false; if (sortExprs == null) { sortExprs = qb.getParseInfo().getOrderByForClause(dest); if (sortExprs != null) { - assert numReducers == 1; - // in strict mode, in the presence of order by, limit must be specified - Integer limit = qb.getParseInfo().getDestLimit(dest); - if (conf.getVar(HiveConf.ConfVars.HIVEMAPREDMODE).equalsIgnoreCase( - "strict") + if (qb.getIsQuery() && !qb.getParseInfo().getIsSubQ() && !qb.isAnalyzeRewrite() + && conf.getIntVar(ConfVars.HIVEPARALLELORDERBYBUCKETINGNUM) != 0) { + numReducers = conf.getIntVar(ConfVars.HIVEPARALLELORDERBYBUCKETINGNUM); + useBucketingForOrderby = true; + } else { + assert numReducers == 1; + // in strict mode, in the presence of order by, limit must be specified + Integer limit = qb.getParseInfo().getDestLimit(dest); + if (conf.getVar(HiveConf.ConfVars.HIVEMAPREDMODE).equalsIgnoreCase("strict") && limit == null) { - throw new SemanticException(generateErrorMessage(sortExprs, + throw new SemanticException(generateErrorMessage(sortExprs, ErrorMsg.NO_LIMIT_WITH_ORDERBY.getMsg())); + } } } } @@ -6441,6 +6447,9 @@ private Operator genReduceSinkPlan(String dest, QB qb, Operator input, ExprNodeDesc exprNode = genExprNodeDesc(cl, inputRR); sortCols.add(exprNode); } + if (useBucketingForOrderby) { + qb.setBucketKeys(sortCols); + } } // For the generation of the values expression just get the inputs diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index b58a0a3..c2b8b51 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -119,6 +119,7 @@ public void compile(final ParseContext pCtx, final List> rowsComputedFromStats; private transient ObjectInspector statRowOI; + private List mergeKeys; + /** * Serialization Null Format for the serde used to fetch data. */ @@ -271,6 +273,23 @@ public SplitSample getSplitSample() { return splitSample; } + public boolean isMergeFetcher() { + return mergeKeys != null && !mergeKeys.isEmpty(); + } + + public List getMergeKeys() { + return mergeKeys; + } + + @Explain(displayName = "Merge Keys") + public String getMergeKeysDesc() { + return mergeKeys == null ? null : PlanUtils.getExprListString(mergeKeys); + } + + public void setMergeKeys(List mergeKeys) { + this.mergeKeys = mergeKeys; + } + @Override public String toString() { if (tblDir != null) { diff --git ql/src/test/queries/clientpositive/orderby_query_bucketing.q ql/src/test/queries/clientpositive/orderby_query_bucketing.q new file mode 100644 index 0000000..0f0cc0e --- /dev/null +++ ql/src/test/queries/clientpositive/orderby_query_bucketing.q @@ -0,0 +1,19 @@ +create table src_100 (key string, value string); +insert into table src_100 select * from src limit 100; + +set hive.orderby.query.bucketing = 3; + +explain extended select key,value from src_100 order by key; +select key,value from src_100 order by key; + +explain extended select sum(key) as sum, value from src_100 group by value order by sum; +select sum(key) as sum, value from src_100 group by value order by sum; + +-- negative, subquery +explain extended select sum(key), a.value from (select * from src_100 order by key) a group by a.value; + +-- negative, insert +CREATE TABLE insert_temp (key int, value string); + +EXPLAIN extended INSERT INTO TABLE insert_temp SELECT sum(key) as sum, value from src_100 group by value order by sum; + diff --git ql/src/test/results/clientpositive/orderby_query_bucketing.q.out ql/src/test/results/clientpositive/orderby_query_bucketing.q.out new file mode 100644 index 0000000..c02b1c9 Binary files /dev/null and ql/src/test/results/clientpositive/orderby_query_bucketing.q.out differ