Index: build-common.xml =================================================================== --- build-common.xml (revision 1495743) +++ build-common.xml (working copy) @@ -59,7 +59,7 @@ - + Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1495743) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -560,6 +560,11 @@ HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true), HIVEOPTREDUCEDEDUPLICATIONMINREDUCER("hive.optimize.reducededuplication.min.reducer", 4), + + HIVESAMPLINGFORORDERBY("hive.optimize.sampling.orderby", false), + HIVESAMPLINGNUMBERFORORDERBY("hive.optimize.sampling.orderby.number", 1000), + HIVESAMPLINGPERCENTFORORDERBY("hive.optimize.sampling.orderby.percent", 0.1f), + // whether to optimize union followed by select followed by filesink // It creates sub-directories in the final output, so should not be turned on in systems // where MAPREDUCE-1501 is not present Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1495743) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -74,6 +74,7 @@ import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; @@ -84,6 +85,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; import org.apache.log4j.Appender; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.FileAppender; @@ -416,6 +418,17 @@ addInputPaths(job, work, emptyScratchDirStr, ctx); Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI()); + + if (work.getSamplingType() > 0) { + try { + handleSampling(driverContext, work, job, new HiveConf(conf)); + job.setPartitionerClass(HiveTotalOrderPartitioner.class); + } catch (Exception e) { + LOG.info("Failed to use sampling", e); + work.setNumReduceTasks(1); // rollback + } + } + // remove the pwd from conf file so that job tracker doesn't show this // logs String pwd = HiveConf.getVar(job, HiveConf.ConfVars.METASTOREPWD); @@ -511,6 +524,64 @@ return (returnVal); } + private void handleSampling(DriverContext context, MapredWork work, JobConf job, HiveConf conf) + throws Exception { + assert work.getAliasToWork().keySet().size() == 1; + + String alias = work.getAliases().get(0); + Operator topOp = work.getAliasToWork().get(alias); + PartitionDesc partDesc = work.getAliasToPartnInfo().get(alias); + + ArrayList paths = work.getPaths(); + ArrayList parts = work.getPartitionDescs(); + + Path onePath = new Path(paths.get(0)); + String tmpPath = context.getCtx().getExternalTmpFileURI(onePath.toUri()); + + Path partitionFile = new Path(tmpPath, ".partitions"); + TotalOrderPartitioner.setPartitionFile(job, partitionFile); + + PartitionKeySampler sampler = new PartitionKeySampler(); + + if (work.getSamplingType() == MapredWork.SAMPLING_ON_PREV_MR) { + console.printInfo("Use sampling data created in previous MR"); + // merges sampling data from previous MR and make paritition keys for total sort + for (String path : paths) { + Path inputPath = new Path(path); + FileSystem fs = inputPath.getFileSystem(job); + for (FileStatus status : fs.globStatus(new Path(inputPath, ".sampling*"))) { + sampler.addSampleFile(status.getPath(), job); + } + } + } else if (work.getSamplingType() == MapredWork.SAMPLING_ON_START) { + console.printInfo("Creating sampling data.."); + assert topOp instanceof TableScanOperator; + TableScanOperator ts = (TableScanOperator) topOp; + + FetchWork fetchWork; + if (!partDesc.isPartitioned()) { + assert paths.size() == 1; + fetchWork = new FetchWork(paths.get(0), partDesc.getTableDesc()); + } else { + fetchWork = new FetchWork(paths, parts, partDesc.getTableDesc()); + } + fetchWork.setSource(ts); + + // random sampling + FetchOperator fetcher = PartitionKeySampler.createSampler(fetchWork, conf, job, ts); + try { + ts.initialize(conf, new ObjectInspector[]{fetcher.getOutputObjectInspector()}); + ts.setOutputCollector(sampler); + while (fetcher.pushRow()) { } + } finally { + fetcher.clearFetchContext(); + } + } else { + throw new IllegalArgumentException("Invalid sampling type " + work.getSamplingType()); + } + sampler.writePartitionKeys(partitionFile, job); + } + /** * Set hive input format, and input format file if necessary. */ Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (revision 1495743) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (working copy) @@ -487,11 +487,15 @@ public boolean pushRow() throws IOException, HiveException { InspectableObject row = getNextRow(); if (row != null) { - operator.process(row.o, 0); + pushRow(row); } return row != null; } + protected void pushRow(InspectableObject row) throws HiveException { + operator.process(row.o, 0); + } + private transient final InspectableObject inspectable = new InspectableObject(); /** Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java (working copy) @@ -0,0 +1,43 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Partitioner; +import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; + +public class HiveTotalOrderPartitioner implements Partitioner { + + private Partitioner partitioner + = new TotalOrderPartitioner(); + + public void configure(JobConf job) { + JobConf newconf = new JobConf(job); + newconf.setMapOutputKeyClass(BytesWritable.class); + partitioner.configure(newconf); + } + + public int getPartition(HiveKey key, Object value, int numPartitions) { + return partitioner.getPartition(key, value, numPartitions); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (revision 1495743) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (working copy) @@ -1,4 +1,9 @@ /** + <<<<<<< HEAD + ======= + * Copyright 2010 The Apache Software Foundation + * + >>>>>>> HIVE-1402 [jira] Add parallel ORDER BY to Hive * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,6 +33,11 @@ return findOperator(start, clazz, new HashSet()); } + public static T findSingleOperator(Operator start, Class clazz) { + Set found = findOperator(start, clazz, new HashSet()); + return found.size() == 1 ? found.iterator().next() : null; + } + public static Set findOperators(Collection> starts, Class clazz) { Set found = new HashSet(); for (Operator start : starts) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java (working copy) @@ -0,0 +1,156 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Random; + +public class PartitionKeySampler implements OutputCollector { + + public static final Comparator C = new Comparator() { + public final int compare(byte[] o1, byte[] o2) { + return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, o2.length); + } + }; + + private List sampled = new ArrayList(); + + public void addSampleFile(Path inputPath, JobConf job) throws IOException { + FileSystem fs = inputPath.getFileSystem(job); + FSDataInputStream input = fs.open(inputPath); + try { + int count = input.readInt(); + for (int i = 0; i < count; i++) { + byte[] key = new byte[input.readInt()]; + input.readFully(key); + sampled.add(key); + } + } finally { + IOUtils.closeStream(input); + } + } + + // keys from FetchSampler are collected here + public void collect(HiveKey key, Object value) throws IOException { + sampled.add(Arrays.copyOfRange(key.getBytes(), 0, key.getLength())); + } + + // sort and pick partition keys + // copied from org.apache.hadoop.mapred.lib.InputSampler + private byte[][] getPartitionKeys(int numReduce) { + if (sampled.size() < numReduce - 1) { + throw new IllegalStateException("not enough number of sample"); + } + byte[][] sorted = sampled.toArray(new byte[sampled.size()][]); + Arrays.sort(sorted, C); + byte[][] partitionKeys = new byte[numReduce - 1][]; + float stepSize = sorted.length / (float) numReduce; + int last = -1; + for(int i = 1; i < numReduce; ++i) { + int k = Math.round(stepSize * i); + while (last >= k && C.compare(sorted[last], sorted[k]) == 0) { + k++; + } + partitionKeys[i - 1] = sorted[k]; + last = k; + } + return partitionKeys; + } + + public void writePartitionKeys(Path path, JobConf job) throws IOException { + byte[][] partitionKeys = getPartitionKeys(job.getNumReduceTasks()); + + FileSystem fs = path.getFileSystem(job); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, path, + BytesWritable.class, NullWritable.class); + try { + for (byte[] pkey : partitionKeys) { + BytesWritable wrapper = new BytesWritable(pkey); + writer.append(wrapper, NullWritable.get()); + } + } finally { + IOUtils.closeStream(writer); + } + } + + // random sampling + public static FetchSampler createSampler(FetchWork work, HiveConf conf, JobConf job, + Operator operator) { + int sampleNum = conf.getIntVar(HiveConf.ConfVars.HIVESAMPLINGNUMBERFORORDERBY); + float samplePercent = conf.getFloatVar(HiveConf.ConfVars.HIVESAMPLINGPERCENTFORORDERBY); + if (samplePercent < 0.0 || samplePercent > 1.0) { + throw new RuntimeException("Percentile value must be within the range of 0 to 1."); + } + FetchSampler sampler = new FetchSampler(work, job, operator); + sampler.setSampleNum(sampleNum); + sampler.setSamplePercent(samplePercent); + return sampler; + } + + private static class FetchSampler extends FetchOperator { + + private int sampleNum = 1000; + private float samplePercent = 0.1f; + private final Random random = new Random(); + + public FetchSampler(FetchWork work, JobConf job, Operator operator) { + super(work, job, operator, null); + } + + public void setSampleNum(int numSample) { + this.sampleNum = numSample; + } + + public void setSamplePercent(float samplePercent) { + this.samplePercent = samplePercent; + } + + @Override + public boolean pushRow() throws IOException, HiveException { + InspectableObject row = getNextRow(); + if (row != null && random.nextFloat() < samplePercent) { + sampleNum--; + pushRow(row); + } + return sampleNum > 0 && row != null; + } + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java (revision 1495743) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java (working copy) @@ -36,6 +36,8 @@ */ public class BucketingSortingCtx implements NodeProcessorCtx { + boolean disableBucketing; + // A mapping from an operator to the columns by which it's output is bucketed Map, List> bucketedColsByOp; // A mapping from a directory which a FileSinkOperator writes into to the columns by which that @@ -48,7 +50,8 @@ // output is sorted Map> sortedColsByDirectory; - public BucketingSortingCtx() { + public BucketingSortingCtx(boolean disableBucketing) { + this.disableBucketing = disableBucketing; this.bucketedColsByOp = new HashMap, List>(); this.bucketedColsByDirectory = new HashMap>(); this.sortedColsByOp = new HashMap, List>(); @@ -57,21 +60,25 @@ public List getBucketedCols(Operator op) { - return bucketedColsByOp.get(op); + return disableBucketing ? null : bucketedColsByOp.get(op); } public void setBucketedCols(Operator op, List bucketCols) { - this.bucketedColsByOp.put(op, bucketCols); + if (!disableBucketing) { + bucketedColsByOp.put(op, bucketCols); + } } public Map> getBucketedColsByDirectory() { - return bucketedColsByDirectory; + return disableBucketing ? null : bucketedColsByDirectory; } public void setBucketedColsByDirectory(Map> bucketedColsByDirectory) { - this.bucketedColsByDirectory = bucketedColsByDirectory; + if (!disableBucketing) { + this.bucketedColsByDirectory = bucketedColsByDirectory; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java (revision 1495743) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java (working copy) @@ -91,7 +91,9 @@ continue; } - BucketingSortingCtx bCtx = new BucketingSortingCtx(); + // uses sampling, which means it's not bucketed + boolean disableBucketing = mapRedTask.getWork().getSamplingType() > 0; + BucketingSortingCtx bCtx = new BucketingSortingCtx(disableBucketing); // RuleRegExp rules are used to match operators anywhere in the tree // RuleExactMatch rules are used to specify exactly what the tree should look like Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java (revision 1495743) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java (working copy) @@ -34,7 +34,7 @@ Dispatcher dispatcher = new IndexWhereTaskDispatcher(physicalContext); GraphWalker opGraphWalker = new DefaultGraphWalker(dispatcher); ArrayList topNodes = new ArrayList(); - topNodes.addAll(physicalContext.rootTasks); + topNodes.addAll(physicalContext.getRootTasks()); opGraphWalker.startWalking(topNodes, null); return physicalContext; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (revision 1495743) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (working copy) @@ -73,7 +73,7 @@ // get all the tasks nodes from root task ArrayList topNodes = new ArrayList(); - topNodes.addAll(pctx.rootTasks); + topNodes.addAll(pctx.getRootTasks()); // begin to walk through the task tree. ogw.startWalking(topNodes, null); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java (revision 1495743) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java (working copy) @@ -171,7 +171,7 @@ Dispatcher disp = new MetadataOnlyTaskDispatcher(pctx); GraphWalker ogw = new DefaultGraphWalker(disp); ArrayList topNodes = new ArrayList(); - topNodes.addAll(pctx.rootTasks); + topNodes.addAll(pctx.getRootTasks()); ogw.startWalking(topNodes, null); return pctx; } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java (revision 1495743) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java (working copy) @@ -72,11 +72,27 @@ this.context = context; } + public List> getRootTasks() { + return rootTasks; + } + + public void setRootTasks(List> rootTasks) { + this.rootTasks = rootTasks; + } + + public Task getFetchTask() { + return fetchTask; + } + + public void setFetchTask(Task fetchTask) { + this.fetchTask = fetchTask; + } + public void addToRootTask(Task tsk){ rootTasks.add(tsk); } + public void removeFromRootTask(Task tsk){ rootTasks.remove(tsk); } - } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (revision 1495743) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (working copy) @@ -67,6 +67,9 @@ if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVEMETADATAONLYQUERIES)) { resolvers.add(new MetadataOnlyOptimizer()); } + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVESAMPLINGFORORDERBY)) { + resolvers.add(new SamplingOptimizer()); + } // Physical optimizers which follow this need to be careful not to invalidate the inferences // made by this optimizer. Only optimizers which depend on the results of this one should Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java (working copy) @@ -0,0 +1,63 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.physical; + +import org.apache.hadoop.hive.ql.exec.MapRedTask; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.MapredWork; + +/** + * Mark final MapredWork for ORDER BY to use sampling and set number of reduce task as -1 + */ +public class SamplingOptimizer implements PhysicalPlanResolver { + + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + for (Task task : pctx.getRootTasks()) { + if (!(task instanceof MapRedTask) || !((MapRedTask)task).getWork().isFinalMapRed()) { + continue; // this could be replaced by bucketing on RS + bucketed fetcher for next MR + } + MapredWork mapreWork = ((MapRedTask) task).getWork(); + if (mapreWork.getNumReduceTasks() != 1 || mapreWork.getAliasToWork().size() != 1 || + mapreWork.getSamplingType() > 0 || mapreWork.getReducer() == null) { + continue; + } + Operator operator = mapreWork.getAliasToWork().values().iterator().next(); + if (!(operator instanceof TableScanOperator)) { + continue; + } + ReduceSinkOperator child = + OperatorUtils.findSingleOperator(operator, ReduceSinkOperator.class); + if (child == null || + child.getConf().getNumReducers() != 1 || !child.getConf().getPartitionCols().isEmpty()) { + continue; + } + child.getConf().setNumReducers(-1); + mapreWork.setNumReduceTasks(-1); + mapreWork.setSamplingType(MapredWork.SAMPLING_ON_START); + } + return pctx; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java (revision 1495743) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java (working copy) @@ -51,7 +51,7 @@ Dispatcher disp = new SkewJoinTaskDispatcher(pctx); GraphWalker ogw = new DefaultGraphWalker(disp); ArrayList topNodes = new ArrayList(); - topNodes.addAll(pctx.rootTasks); + topNodes.addAll(pctx.getRootTasks()); ogw.startWalking(topNodes, null); return pctx; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1495743) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy) @@ -112,6 +112,12 @@ private final Map> sortedColsByDirectory = new HashMap>(); + // use sampled partitioning + private int samplingType; + + public static final int SAMPLING_ON_PREV_MR = 1; // todo HIVE-3841 + public static final int SAMPLING_ON_START = 2; // sampling on task running + public MapredWork() { aliasToPartnInfo = new LinkedHashMap(); } @@ -235,6 +241,22 @@ } } + public ArrayList getAliases() { + return new ArrayList(aliasToWork.keySet()); + } + + public ArrayList> getWorks() { + return new ArrayList>(aliasToWork.values()); + } + + public ArrayList getPaths() { + return new ArrayList(pathToAliases.keySet()); + } + + public ArrayList getPartitionDescs() { + return new ArrayList(aliasToPartnInfo.values()); + } + /** * @return the mapredLocalWork */ @@ -594,4 +616,18 @@ } } } + + public int getSamplingType() { + return samplingType; + } + + public void setSamplingType(int samplingType) { + this.samplingType = samplingType; + } + + @Explain(displayName = "Sampling") + public String getSamplingTypeString() { + return samplingType == 1 ? "SAMPLING_ON_PREV_MR" : + samplingType == 2 ? "SAMPLING_ON_START" : null; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java (revision 1495743) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java (working copy) @@ -242,6 +242,10 @@ return baseFileName; } + public boolean isPartitioned() { + return partSpec != null && !partSpec.isEmpty(); + } + @Override public PartitionDesc clone() { PartitionDesc ret = new PartitionDesc(); Index: ql/src/java/org/apache/hadoop/hive/ql/udf/UDAFPercentile.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/UDAFPercentile.java (revision 1495743) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/UDAFPercentile.java (working copy) @@ -149,7 +149,7 @@ } if (state.percentiles == null) { if (percentile < 0.0 || percentile > 1.0) { - throw new RuntimeException("Percentile value must be wihin the range of 0 to 1."); + throw new RuntimeException("Percentile value must be within the range of 0 to 1."); } state.percentiles = new ArrayList(1); state.percentiles.add(new DoubleWritable(percentile.doubleValue())); @@ -238,7 +238,7 @@ if(percentiles != null) { for (int i = 0; i < percentiles.size(); i++) { if (percentiles.get(i).get() < 0.0 || percentiles.get(i).get() > 1.0) { - throw new RuntimeException("Percentile value must be wihin the range of 0 to 1."); + throw new RuntimeException("Percentile value must be within the range of 0 to 1."); } } state.percentiles = new ArrayList(percentiles); Index: ql/src/test/queries/clientpositive/parallel_orderby.q =================================================================== --- ql/src/test/queries/clientpositive/parallel_orderby.q (revision 0) +++ ql/src/test/queries/clientpositive/parallel_orderby.q (working copy) @@ -0,0 +1,14 @@ +create table src5 (key string, value string); +load data local inpath '../data/files/kv5.txt' into table src5; +load data local inpath '../data/files/kv5.txt' into table src5; + +set mapred.reduce.tasks = 4; +set hive.optimize.sampling.orderby=true; +set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; + +explain +create table total_ordered as select * from src5 order by key, value; +create table total_ordered as select * from src5 order by key, value; + +desc formatted total_ordered; +select * from total_ordered; Index: ql/src/test/results/clientpositive/parallel_orderby.q.out =================================================================== --- ql/src/test/results/clientpositive/parallel_orderby.q.out (revision 0) +++ ql/src/test/results/clientpositive/parallel_orderby.q.out (working copy) @@ -0,0 +1,188 @@ +PREHOOK: query: create table src5 (key string, value string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table src5 (key string, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@src5 +PREHOOK: query: load data local inpath '../data/files/kv5.txt' into table src5 +PREHOOK: type: LOAD +PREHOOK: Output: default@src5 +POSTHOOK: query: load data local inpath '../data/files/kv5.txt' into table src5 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@src5 +PREHOOK: query: load data local inpath '../data/files/kv5.txt' into table src5 +PREHOOK: type: LOAD +PREHOOK: Output: default@src5 +POSTHOOK: query: load data local inpath '../data/files/kv5.txt' into table src5 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@src5 +PREHOOK: query: explain +create table total_ordered as select * from src5 order by key, value +PREHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: query: explain +create table total_ordered as select * from src5 order by key, value +POSTHOOK: type: CREATETABLE_AS_SELECT +ABSTRACT SYNTAX TREE: + (TOK_CREATETABLE (TOK_TABNAME total_ordered) TOK_LIKETABLE (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src5))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL key)) (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL value)))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + Stage-3 depends on stages: Stage-0 + Stage-2 depends on stages: Stage-3 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src5 + TableScan + alias: src5 + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + sort order: ++ + tag: -1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + Reduce Operator Tree: + Extract + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + name: default.total_ordered + Sampling: SAMPLING_ON_START + + Stage: Stage-0 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + Stage: Stage-3 + Create Table Operator: + Create Table + columns: key string, value string + if not exists: false + input format: org.apache.hadoop.mapred.TextInputFormat + # buckets: -1 + output format: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat + name: total_ordered + isExternal: false + + Stage: Stage-2 + Stats-Aggr Operator + + +PREHOOK: query: create table total_ordered as select * from src5 order by key, value +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src5 +POSTHOOK: query: create table total_ordered as select * from src5 order by key, value +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src5 +POSTHOOK: Output: default@total_ordered +PREHOOK: query: desc formatted total_ordered +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted total_ordered +POSTHOOK: type: DESCTABLE +# col_name data_type comment + +key string None +value string None + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + numFiles 4 + numPartitions 0 + numRows 0 + rawDataSize 0 + totalSize 560 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: select * from total_ordered +PREHOOK: type: QUERY +PREHOOK: Input: default@total_ordered +#### A masked pattern was here #### +POSTHOOK: query: select * from total_ordered +POSTHOOK: type: QUERY +POSTHOOK: Input: default@total_ordered +#### A masked pattern was here #### +128 val_128 +128 val_128 +150 val_150 +150 val_150 +165 val_165 +165 val_165 +193 val_193 +193 val_193 +213 val_213 +213 val_213 +213 val_213 +213 val_213 +213 val_214 +213 val_214 +224 val_224 +224 val_224 +238 val_238 +238 val_238 +238 val_239 +238 val_239 +238 val_240 +238 val_240 +255 val_255 +255 val_255 +265 val_265 +265 val_265 +27 val_27 +27 val_27 +273 val_273 +273 val_273 +278 val_278 +278 val_278 +311 val_311 +311 val_311 +369 val_369 +369 val_369 +401 val_401 +401 val_401 +409 val_409 +409 val_409 +484 val_484 +484 val_484 +66 val_66 +66 val_66 +86 val_86 +86 val_86 +98 val_98 +98 val_98