Index: build-common.xml
===================================================================
--- build-common.xml (revision 1495743)
+++ build-common.xml (working copy)
@@ -59,7 +59,7 @@
-
+
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1495743)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -560,6 +560,11 @@
HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true),
HIVEOPTREDUCEDEDUPLICATIONMINREDUCER("hive.optimize.reducededuplication.min.reducer", 4),
+
+ HIVESAMPLINGFORORDERBY("hive.optimize.sampling.orderby", false),
+ HIVESAMPLINGNUMBERFORORDERBY("hive.optimize.sampling.orderby.number", 1000),
+ HIVESAMPLINGPERCENTFORORDERBY("hive.optimize.sampling.orderby.percent", 0.1f),
+
// whether to optimize union followed by select followed by filesink
// It creates sub-directories in the final output, so should not be turned on in systems
// where MAPREDUCE-1501 is not present
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1495743)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy)
@@ -74,6 +74,7 @@
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
@@ -84,6 +85,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
import org.apache.log4j.Appender;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.FileAppender;
@@ -416,6 +418,17 @@
addInputPaths(job, work, emptyScratchDirStr, ctx);
Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
+
+ if (work.getSamplingType() > 0) {
+ try {
+ handleSampling(driverContext, work, job, new HiveConf(conf));
+ job.setPartitionerClass(HiveTotalOrderPartitioner.class);
+ } catch (Exception e) {
+ LOG.info("Failed to use sampling", e);
+ work.setNumReduceTasks(1); // rollback
+ }
+ }
+
// remove the pwd from conf file so that job tracker doesn't show this
// logs
String pwd = HiveConf.getVar(job, HiveConf.ConfVars.METASTOREPWD);
@@ -511,6 +524,64 @@
return (returnVal);
}
+ private void handleSampling(DriverContext context, MapredWork work, JobConf job, HiveConf conf)
+ throws Exception {
+ assert work.getAliasToWork().keySet().size() == 1;
+
+ String alias = work.getAliases().get(0);
+ Operator> topOp = work.getAliasToWork().get(alias);
+ PartitionDesc partDesc = work.getAliasToPartnInfo().get(alias);
+
+ ArrayList paths = work.getPaths();
+ ArrayList parts = work.getPartitionDescs();
+
+ Path onePath = new Path(paths.get(0));
+ String tmpPath = context.getCtx().getExternalTmpFileURI(onePath.toUri());
+
+ Path partitionFile = new Path(tmpPath, ".partitions");
+ TotalOrderPartitioner.setPartitionFile(job, partitionFile);
+
+ PartitionKeySampler sampler = new PartitionKeySampler();
+
+ if (work.getSamplingType() == MapredWork.SAMPLING_ON_PREV_MR) {
+ console.printInfo("Use sampling data created in previous MR");
+ // merges sampling data from previous MR and make paritition keys for total sort
+ for (String path : paths) {
+ Path inputPath = new Path(path);
+ FileSystem fs = inputPath.getFileSystem(job);
+ for (FileStatus status : fs.globStatus(new Path(inputPath, ".sampling*"))) {
+ sampler.addSampleFile(status.getPath(), job);
+ }
+ }
+ } else if (work.getSamplingType() == MapredWork.SAMPLING_ON_START) {
+ console.printInfo("Creating sampling data..");
+ assert topOp instanceof TableScanOperator;
+ TableScanOperator ts = (TableScanOperator) topOp;
+
+ FetchWork fetchWork;
+ if (!partDesc.isPartitioned()) {
+ assert paths.size() == 1;
+ fetchWork = new FetchWork(paths.get(0), partDesc.getTableDesc());
+ } else {
+ fetchWork = new FetchWork(paths, parts, partDesc.getTableDesc());
+ }
+ fetchWork.setSource(ts);
+
+ // random sampling
+ FetchOperator fetcher = PartitionKeySampler.createSampler(fetchWork, conf, job, ts);
+ try {
+ ts.initialize(conf, new ObjectInspector[]{fetcher.getOutputObjectInspector()});
+ ts.setOutputCollector(sampler);
+ while (fetcher.pushRow()) { }
+ } finally {
+ fetcher.clearFetchContext();
+ }
+ } else {
+ throw new IllegalArgumentException("Invalid sampling type " + work.getSamplingType());
+ }
+ sampler.writePartitionKeys(partitionFile, job);
+ }
+
/**
* Set hive input format, and input format file if necessary.
*/
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (revision 1495743)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (working copy)
@@ -487,11 +487,15 @@
public boolean pushRow() throws IOException, HiveException {
InspectableObject row = getNextRow();
if (row != null) {
- operator.process(row.o, 0);
+ pushRow(row);
}
return row != null;
}
+ protected void pushRow(InspectableObject row) throws HiveException {
+ operator.process(row.o, 0);
+ }
+
private transient final InspectableObject inspectable = new InspectableObject();
/**
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java (working copy)
@@ -0,0 +1,43 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
+
+public class HiveTotalOrderPartitioner implements Partitioner {
+
+ private Partitioner partitioner
+ = new TotalOrderPartitioner();
+
+ public void configure(JobConf job) {
+ JobConf newconf = new JobConf(job);
+ newconf.setMapOutputKeyClass(BytesWritable.class);
+ partitioner.configure(newconf);
+ }
+
+ public int getPartition(HiveKey key, Object value, int numPartitions) {
+ return partitioner.getPartition(key, value, numPartitions);
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (revision 1495743)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (working copy)
@@ -1,4 +1,9 @@
/**
+ <<<<<<< HEAD
+ =======
+ * Copyright 2010 The Apache Software Foundation
+ *
+ >>>>>>> HIVE-1402 [jira] Add parallel ORDER BY to Hive
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,6 +33,11 @@
return findOperator(start, clazz, new HashSet());
}
+ public static T findSingleOperator(Operator> start, Class clazz) {
+ Set found = findOperator(start, clazz, new HashSet());
+ return found.size() == 1 ? found.iterator().next() : null;
+ }
+
public static Set findOperators(Collection> starts, Class clazz) {
Set found = new HashSet();
for (Operator> start : starts) {
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java (working copy)
@@ -0,0 +1,156 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+
+public class PartitionKeySampler implements OutputCollector {
+
+ public static final Comparator C = new Comparator() {
+ public final int compare(byte[] o1, byte[] o2) {
+ return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, o2.length);
+ }
+ };
+
+ private List sampled = new ArrayList();
+
+ public void addSampleFile(Path inputPath, JobConf job) throws IOException {
+ FileSystem fs = inputPath.getFileSystem(job);
+ FSDataInputStream input = fs.open(inputPath);
+ try {
+ int count = input.readInt();
+ for (int i = 0; i < count; i++) {
+ byte[] key = new byte[input.readInt()];
+ input.readFully(key);
+ sampled.add(key);
+ }
+ } finally {
+ IOUtils.closeStream(input);
+ }
+ }
+
+ // keys from FetchSampler are collected here
+ public void collect(HiveKey key, Object value) throws IOException {
+ sampled.add(Arrays.copyOfRange(key.getBytes(), 0, key.getLength()));
+ }
+
+ // sort and pick partition keys
+ // copied from org.apache.hadoop.mapred.lib.InputSampler
+ private byte[][] getPartitionKeys(int numReduce) {
+ if (sampled.size() < numReduce - 1) {
+ throw new IllegalStateException("not enough number of sample");
+ }
+ byte[][] sorted = sampled.toArray(new byte[sampled.size()][]);
+ Arrays.sort(sorted, C);
+ byte[][] partitionKeys = new byte[numReduce - 1][];
+ float stepSize = sorted.length / (float) numReduce;
+ int last = -1;
+ for(int i = 1; i < numReduce; ++i) {
+ int k = Math.round(stepSize * i);
+ while (last >= k && C.compare(sorted[last], sorted[k]) == 0) {
+ k++;
+ }
+ partitionKeys[i - 1] = sorted[k];
+ last = k;
+ }
+ return partitionKeys;
+ }
+
+ public void writePartitionKeys(Path path, JobConf job) throws IOException {
+ byte[][] partitionKeys = getPartitionKeys(job.getNumReduceTasks());
+
+ FileSystem fs = path.getFileSystem(job);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, path,
+ BytesWritable.class, NullWritable.class);
+ try {
+ for (byte[] pkey : partitionKeys) {
+ BytesWritable wrapper = new BytesWritable(pkey);
+ writer.append(wrapper, NullWritable.get());
+ }
+ } finally {
+ IOUtils.closeStream(writer);
+ }
+ }
+
+ // random sampling
+ public static FetchSampler createSampler(FetchWork work, HiveConf conf, JobConf job,
+ Operator> operator) {
+ int sampleNum = conf.getIntVar(HiveConf.ConfVars.HIVESAMPLINGNUMBERFORORDERBY);
+ float samplePercent = conf.getFloatVar(HiveConf.ConfVars.HIVESAMPLINGPERCENTFORORDERBY);
+ if (samplePercent < 0.0 || samplePercent > 1.0) {
+ throw new RuntimeException("Percentile value must be within the range of 0 to 1.");
+ }
+ FetchSampler sampler = new FetchSampler(work, job, operator);
+ sampler.setSampleNum(sampleNum);
+ sampler.setSamplePercent(samplePercent);
+ return sampler;
+ }
+
+ private static class FetchSampler extends FetchOperator {
+
+ private int sampleNum = 1000;
+ private float samplePercent = 0.1f;
+ private final Random random = new Random();
+
+ public FetchSampler(FetchWork work, JobConf job, Operator> operator) {
+ super(work, job, operator, null);
+ }
+
+ public void setSampleNum(int numSample) {
+ this.sampleNum = numSample;
+ }
+
+ public void setSamplePercent(float samplePercent) {
+ this.samplePercent = samplePercent;
+ }
+
+ @Override
+ public boolean pushRow() throws IOException, HiveException {
+ InspectableObject row = getNextRow();
+ if (row != null && random.nextFloat() < samplePercent) {
+ sampleNum--;
+ pushRow(row);
+ }
+ return sampleNum > 0 && row != null;
+ }
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java (revision 1495743)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java (working copy)
@@ -36,6 +36,8 @@
*/
public class BucketingSortingCtx implements NodeProcessorCtx {
+ boolean disableBucketing;
+
// A mapping from an operator to the columns by which it's output is bucketed
Map, List> bucketedColsByOp;
// A mapping from a directory which a FileSinkOperator writes into to the columns by which that
@@ -48,7 +50,8 @@
// output is sorted
Map> sortedColsByDirectory;
- public BucketingSortingCtx() {
+ public BucketingSortingCtx(boolean disableBucketing) {
+ this.disableBucketing = disableBucketing;
this.bucketedColsByOp = new HashMap, List>();
this.bucketedColsByDirectory = new HashMap>();
this.sortedColsByOp = new HashMap, List>();
@@ -57,21 +60,25 @@
public List getBucketedCols(Operator extends OperatorDesc> op) {
- return bucketedColsByOp.get(op);
+ return disableBucketing ? null : bucketedColsByOp.get(op);
}
public void setBucketedCols(Operator extends OperatorDesc> op, List bucketCols) {
- this.bucketedColsByOp.put(op, bucketCols);
+ if (!disableBucketing) {
+ bucketedColsByOp.put(op, bucketCols);
+ }
}
public Map> getBucketedColsByDirectory() {
- return bucketedColsByDirectory;
+ return disableBucketing ? null : bucketedColsByDirectory;
}
public void setBucketedColsByDirectory(Map> bucketedColsByDirectory) {
- this.bucketedColsByDirectory = bucketedColsByDirectory;
+ if (!disableBucketing) {
+ this.bucketedColsByDirectory = bucketedColsByDirectory;
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java (revision 1495743)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java (working copy)
@@ -91,7 +91,9 @@
continue;
}
- BucketingSortingCtx bCtx = new BucketingSortingCtx();
+ // uses sampling, which means it's not bucketed
+ boolean disableBucketing = mapRedTask.getWork().getSamplingType() > 0;
+ BucketingSortingCtx bCtx = new BucketingSortingCtx(disableBucketing);
// RuleRegExp rules are used to match operators anywhere in the tree
// RuleExactMatch rules are used to specify exactly what the tree should look like
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java (revision 1495743)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/IndexWhereResolver.java (working copy)
@@ -34,7 +34,7 @@
Dispatcher dispatcher = new IndexWhereTaskDispatcher(physicalContext);
GraphWalker opGraphWalker = new DefaultGraphWalker(dispatcher);
ArrayList topNodes = new ArrayList();
- topNodes.addAll(physicalContext.rootTasks);
+ topNodes.addAll(physicalContext.getRootTasks());
opGraphWalker.startWalking(topNodes, null);
return physicalContext;
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (revision 1495743)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (working copy)
@@ -73,7 +73,7 @@
// get all the tasks nodes from root task
ArrayList topNodes = new ArrayList();
- topNodes.addAll(pctx.rootTasks);
+ topNodes.addAll(pctx.getRootTasks());
// begin to walk through the task tree.
ogw.startWalking(topNodes, null);
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java (revision 1495743)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java (working copy)
@@ -171,7 +171,7 @@
Dispatcher disp = new MetadataOnlyTaskDispatcher(pctx);
GraphWalker ogw = new DefaultGraphWalker(disp);
ArrayList topNodes = new ArrayList();
- topNodes.addAll(pctx.rootTasks);
+ topNodes.addAll(pctx.getRootTasks());
ogw.startWalking(topNodes, null);
return pctx;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java (revision 1495743)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java (working copy)
@@ -72,11 +72,27 @@
this.context = context;
}
+ public List> getRootTasks() {
+ return rootTasks;
+ }
+
+ public void setRootTasks(List> rootTasks) {
+ this.rootTasks = rootTasks;
+ }
+
+ public Task extends Serializable> getFetchTask() {
+ return fetchTask;
+ }
+
+ public void setFetchTask(Task extends Serializable> fetchTask) {
+ this.fetchTask = fetchTask;
+ }
+
public void addToRootTask(Task extends Serializable> tsk){
rootTasks.add(tsk);
}
+
public void removeFromRootTask(Task extends Serializable> tsk){
rootTasks.remove(tsk);
}
-
}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (revision 1495743)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (working copy)
@@ -67,6 +67,9 @@
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVEMETADATAONLYQUERIES)) {
resolvers.add(new MetadataOnlyOptimizer());
}
+ if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVESAMPLINGFORORDERBY)) {
+ resolvers.add(new SamplingOptimizer());
+ }
// Physical optimizers which follow this need to be careful not to invalidate the inferences
// made by this optimizer. Only optimizers which depend on the results of this one should
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java (working copy)
@@ -0,0 +1,63 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import org.apache.hadoop.hive.ql.exec.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+
+/**
+ * Mark final MapredWork for ORDER BY to use sampling and set number of reduce task as -1
+ */
+public class SamplingOptimizer implements PhysicalPlanResolver {
+
+ public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+ for (Task> task : pctx.getRootTasks()) {
+ if (!(task instanceof MapRedTask) || !((MapRedTask)task).getWork().isFinalMapRed()) {
+ continue; // this could be replaced by bucketing on RS + bucketed fetcher for next MR
+ }
+ MapredWork mapreWork = ((MapRedTask) task).getWork();
+ if (mapreWork.getNumReduceTasks() != 1 || mapreWork.getAliasToWork().size() != 1 ||
+ mapreWork.getSamplingType() > 0 || mapreWork.getReducer() == null) {
+ continue;
+ }
+ Operator> operator = mapreWork.getAliasToWork().values().iterator().next();
+ if (!(operator instanceof TableScanOperator)) {
+ continue;
+ }
+ ReduceSinkOperator child =
+ OperatorUtils.findSingleOperator(operator, ReduceSinkOperator.class);
+ if (child == null ||
+ child.getConf().getNumReducers() != 1 || !child.getConf().getPartitionCols().isEmpty()) {
+ continue;
+ }
+ child.getConf().setNumReducers(-1);
+ mapreWork.setNumReduceTasks(-1);
+ mapreWork.setSamplingType(MapredWork.SAMPLING_ON_START);
+ }
+ return pctx;
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java (revision 1495743)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java (working copy)
@@ -51,7 +51,7 @@
Dispatcher disp = new SkewJoinTaskDispatcher(pctx);
GraphWalker ogw = new DefaultGraphWalker(disp);
ArrayList topNodes = new ArrayList();
- topNodes.addAll(pctx.rootTasks);
+ topNodes.addAll(pctx.getRootTasks());
ogw.startWalking(topNodes, null);
return pctx;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1495743)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy)
@@ -112,6 +112,12 @@
private final Map> sortedColsByDirectory =
new HashMap>();
+ // use sampled partitioning
+ private int samplingType;
+
+ public static final int SAMPLING_ON_PREV_MR = 1; // todo HIVE-3841
+ public static final int SAMPLING_ON_START = 2; // sampling on task running
+
public MapredWork() {
aliasToPartnInfo = new LinkedHashMap();
}
@@ -235,6 +241,22 @@
}
}
+ public ArrayList getAliases() {
+ return new ArrayList(aliasToWork.keySet());
+ }
+
+ public ArrayList> getWorks() {
+ return new ArrayList>(aliasToWork.values());
+ }
+
+ public ArrayList getPaths() {
+ return new ArrayList(pathToAliases.keySet());
+ }
+
+ public ArrayList getPartitionDescs() {
+ return new ArrayList(aliasToPartnInfo.values());
+ }
+
/**
* @return the mapredLocalWork
*/
@@ -594,4 +616,18 @@
}
}
}
+
+ public int getSamplingType() {
+ return samplingType;
+ }
+
+ public void setSamplingType(int samplingType) {
+ this.samplingType = samplingType;
+ }
+
+ @Explain(displayName = "Sampling")
+ public String getSamplingTypeString() {
+ return samplingType == 1 ? "SAMPLING_ON_PREV_MR" :
+ samplingType == 2 ? "SAMPLING_ON_START" : null;
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java (revision 1495743)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java (working copy)
@@ -242,6 +242,10 @@
return baseFileName;
}
+ public boolean isPartitioned() {
+ return partSpec != null && !partSpec.isEmpty();
+ }
+
@Override
public PartitionDesc clone() {
PartitionDesc ret = new PartitionDesc();
Index: ql/src/java/org/apache/hadoop/hive/ql/udf/UDAFPercentile.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/udf/UDAFPercentile.java (revision 1495743)
+++ ql/src/java/org/apache/hadoop/hive/ql/udf/UDAFPercentile.java (working copy)
@@ -149,7 +149,7 @@
}
if (state.percentiles == null) {
if (percentile < 0.0 || percentile > 1.0) {
- throw new RuntimeException("Percentile value must be wihin the range of 0 to 1.");
+ throw new RuntimeException("Percentile value must be within the range of 0 to 1.");
}
state.percentiles = new ArrayList(1);
state.percentiles.add(new DoubleWritable(percentile.doubleValue()));
@@ -238,7 +238,7 @@
if(percentiles != null) {
for (int i = 0; i < percentiles.size(); i++) {
if (percentiles.get(i).get() < 0.0 || percentiles.get(i).get() > 1.0) {
- throw new RuntimeException("Percentile value must be wihin the range of 0 to 1.");
+ throw new RuntimeException("Percentile value must be within the range of 0 to 1.");
}
}
state.percentiles = new ArrayList(percentiles);
Index: ql/src/test/queries/clientpositive/parallel_orderby.q
===================================================================
--- ql/src/test/queries/clientpositive/parallel_orderby.q (revision 0)
+++ ql/src/test/queries/clientpositive/parallel_orderby.q (working copy)
@@ -0,0 +1,14 @@
+create table src5 (key string, value string);
+load data local inpath '../data/files/kv5.txt' into table src5;
+load data local inpath '../data/files/kv5.txt' into table src5;
+
+set mapred.reduce.tasks = 4;
+set hive.optimize.sampling.orderby=true;
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+
+explain
+create table total_ordered as select * from src5 order by key, value;
+create table total_ordered as select * from src5 order by key, value;
+
+desc formatted total_ordered;
+select * from total_ordered;
Index: ql/src/test/results/clientpositive/parallel_orderby.q.out
===================================================================
--- ql/src/test/results/clientpositive/parallel_orderby.q.out (revision 0)
+++ ql/src/test/results/clientpositive/parallel_orderby.q.out (working copy)
@@ -0,0 +1,188 @@
+PREHOOK: query: create table src5 (key string, value string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table src5 (key string, value string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@src5
+PREHOOK: query: load data local inpath '../data/files/kv5.txt' into table src5
+PREHOOK: type: LOAD
+PREHOOK: Output: default@src5
+POSTHOOK: query: load data local inpath '../data/files/kv5.txt' into table src5
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@src5
+PREHOOK: query: load data local inpath '../data/files/kv5.txt' into table src5
+PREHOOK: type: LOAD
+PREHOOK: Output: default@src5
+POSTHOOK: query: load data local inpath '../data/files/kv5.txt' into table src5
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@src5
+PREHOOK: query: explain
+create table total_ordered as select * from src5 order by key, value
+PREHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: query: explain
+create table total_ordered as select * from src5 order by key, value
+POSTHOOK: type: CREATETABLE_AS_SELECT
+ABSTRACT SYNTAX TREE:
+ (TOK_CREATETABLE (TOK_TABNAME total_ordered) TOK_LIKETABLE (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src5))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL key)) (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL value))))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+ Stage-3 depends on stages: Stage-0
+ Stage-2 depends on stages: Stage-3
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ src5
+ TableScan
+ alias: src5
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ sort order: ++
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ Reduce Operator Tree:
+ Extract
+ File Output Operator
+ compressed: false
+ GlobalTableId: 1
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ name: default.total_ordered
+ Sampling: SAMPLING_ON_START
+
+ Stage: Stage-0
+ Move Operator
+ files:
+ hdfs directory: true
+#### A masked pattern was here ####
+
+ Stage: Stage-3
+ Create Table Operator:
+ Create Table
+ columns: key string, value string
+ if not exists: false
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ # buckets: -1
+ output format: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
+ name: total_ordered
+ isExternal: false
+
+ Stage: Stage-2
+ Stats-Aggr Operator
+
+
+PREHOOK: query: create table total_ordered as select * from src5 order by key, value
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src5
+POSTHOOK: query: create table total_ordered as select * from src5 order by key, value
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src5
+POSTHOOK: Output: default@total_ordered
+PREHOOK: query: desc formatted total_ordered
+PREHOOK: type: DESCTABLE
+POSTHOOK: query: desc formatted total_ordered
+POSTHOOK: type: DESCTABLE
+# col_name data_type comment
+
+key string None
+value string None
+
+# Detailed Table Information
+Database: default
+#### A masked pattern was here ####
+Protect Mode: None
+Retention: 0
+#### A masked pattern was here ####
+Table Type: MANAGED_TABLE
+Table Parameters:
+ numFiles 4
+ numPartitions 0
+ numRows 0
+ rawDataSize 0
+ totalSize 560
+#### A masked pattern was here ####
+
+# Storage Information
+SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+InputFormat: org.apache.hadoop.mapred.TextInputFormat
+OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+Compressed: No
+Num Buckets: -1
+Bucket Columns: []
+Sort Columns: []
+Storage Desc Params:
+ serialization.format 1
+PREHOOK: query: select * from total_ordered
+PREHOOK: type: QUERY
+PREHOOK: Input: default@total_ordered
+#### A masked pattern was here ####
+POSTHOOK: query: select * from total_ordered
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@total_ordered
+#### A masked pattern was here ####
+128 val_128
+128 val_128
+150 val_150
+150 val_150
+165 val_165
+165 val_165
+193 val_193
+193 val_193
+213 val_213
+213 val_213
+213 val_213
+213 val_213
+213 val_214
+213 val_214
+224 val_224
+224 val_224
+238 val_238
+238 val_238
+238 val_239
+238 val_239
+238 val_240
+238 val_240
+255 val_255
+255 val_255
+265 val_265
+265 val_265
+27 val_27
+27 val_27
+273 val_273
+273 val_273
+278 val_278
+278 val_278
+311 val_311
+311 val_311
+369 val_369
+369 val_369
+401 val_401
+401 val_401
+409 val_409
+409 val_409
+484 val_484
+484 val_484
+66 val_66
+66 val_66
+86 val_86
+86 val_86
+98 val_98
+98 val_98