diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8490558..167c731 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1009,9 +1009,12 @@ "When enabled dynamic partitioning column will be globally sorted.\n" + "This way we can keep only one record writer open for each partition value\n" + "in the reducer thereby reducing the memory pressure on reducers."), - HIVESAMPLINGFORORDERBY("hive.optimize.sampling.orderby", false, ""), - HIVESAMPLINGNUMBERFORORDERBY("hive.optimize.sampling.orderby.number", 1000, ""), - HIVESAMPLINGPERCENTFORORDERBY("hive.optimize.sampling.orderby.percent", 0.1f, ""), + + HIVESAMPLINGFORORDERBY("hive.optimize.sampling.orderby", false, "Uses sampling on order-by clause for parallel execution."), + HIVESAMPLINGNUMBERFORORDERBY("hive.optimize.sampling.orderby.number", 1000, "Total number of samples to be obtained."), + HIVESAMPLINGPERCENTFORORDERBY("hive.optimize.sampling.orderby.percent", 0.1f, "Probability with which a row will be chosen."), + HIVESAMPLINGFORORDERBYMINREDUCER("hive.optimize.sampling.orderby.min.reducer", 0.5f, new RatioValidator(), + "Minimum ratio to number of reducers by sampling to the number of expected reducer."), // whether to optimize union followed by select followed by filesink // It creates sub-directories in the final output, so should not be turned on in systems diff --git common/src/java/org/apache/hadoop/hive/conf/Validator.java common/src/java/org/apache/hadoop/hive/conf/Validator.java index cea9c41..14f9da3 100644 --- common/src/java/org/apache/hadoop/hive/conf/Validator.java +++ common/src/java/org/apache/hadoop/hive/conf/Validator.java @@ -147,7 +147,7 @@ public String validate(String value) { public String validate(String value) { try { float fvalue = Float.valueOf(value); - if (fvalue <= 0 || fvalue >= 1) { + if (fvalue < 0 || fvalue > 1) { return "Invalid ratio " + value + ", which should be in between 0 to 1"; } } catch (NumberFormatException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java index 6c22362..01a67e0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java @@ -20,24 +20,50 @@ package org.apache.hadoop.hive.ql.exec; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; -public class HiveTotalOrderPartitioner implements Partitioner { +public class HiveTotalOrderPartitioner implements Partitioner, Configurable { - private Partitioner partitioner - = new TotalOrderPartitioner(); + private static final Log LOG = LogFactory.getLog(HiveTotalOrderPartitioner.class); + private Partitioner partitioner; + + @Override public void configure(JobConf job) { - JobConf newconf = new JobConf(job); - newconf.setMapOutputKeyClass(BytesWritable.class); - partitioner.configure(newconf); + if (partitioner == null) { + configurePartitioner(new JobConf(job)); + } + } + + @Override + public void setConf(Configuration conf) { + // walk-around of TEZ-1403 + if (partitioner == null) { + configurePartitioner(new JobConf(conf)); + } } public int getPartition(HiveKey key, Object value, int numPartitions) { return partitioner.getPartition(key, value, numPartitions); } + + @Override + public Configuration getConf() { + return null; + } + + private void configurePartitioner(JobConf conf) { + LOG.info(TotalOrderPartitioner.getPartitionFile(conf)); + conf.setMapOutputKeyClass(BytesWritable.class); + partitioner = new TotalOrderPartitioner(); + partitioner.configure(conf); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java index 166461a..d88c7b8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,6 +47,8 @@ public class PartitionKeySampler implements OutputCollector { + private static final Log LOG = LogFactory.getLog(PartitionKeySampler.class); + public static final Comparator C = new Comparator() { public final int compare(byte[] o1, byte[] o2) { return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, o2.length); @@ -74,32 +78,46 @@ public void collect(HiveKey key, Object value) throws IOException { } // sort and pick partition keys - // copied from org.apache.hadoop.mapred.lib.InputSampler + // originally copied from org.apache.hadoop.mapred.lib.InputSampler but seemed to have a bug private byte[][] getPartitionKeys(int numReduce) { if (sampled.size() < numReduce - 1) { throw new IllegalStateException("not enough number of sample"); } byte[][] sorted = sampled.toArray(new byte[sampled.size()][]); Arrays.sort(sorted, C); + byte[][] partitionKeys = new byte[numReduce - 1][]; - float stepSize = sorted.length / (float) numReduce; - int last = -1; - for(int i = 1; i < numReduce; ++i) { - int k = Math.round(stepSize * i); - while (last >= k && C.compare(sorted[last], sorted[k]) == 0) { - k++; + + int last = 0; + int current = 0; + for(int i = 0; i < numReduce - 1; i++) { + current += Math.round((float)(sorted.length - current) / (numReduce - i)); + while (i > 0 && current < sorted.length && C.compare(sorted[last], sorted[current]) == 0) { + current++; } - if (k >= sorted.length) { - throw new IllegalStateException("not enough number of sample"); + if (current >= sorted.length) { + return Arrays.copyOfRange(partitionKeys, 0, i); } - partitionKeys[i - 1] = sorted[k]; - last = k; + if (LOG.isDebugEnabled()) { + LOG.debug(current + ":" + new BytesWritable(sorted[current])); + } + partitionKeys[i] = sorted[current]; + last = current; } return partitionKeys; } - public void writePartitionKeys(Path path, JobConf job) throws IOException { + public void writePartitionKeys(Path path, HiveConf conf, JobConf job) throws IOException { byte[][] partitionKeys = getPartitionKeys(job.getNumReduceTasks()); + if (partitionKeys.length != job.getNumReduceTasks() - 1) { + float minRatio = conf.getFloatVar(HiveConf.ConfVars.HIVESAMPLINGFORORDERBYMINREDUCER); + if ((partitionKeys.length + 1.0f) / job.getNumReduceTasks() < minRatio) { + throw new IllegalStateException("not enough number of reducer " + + (partitionKeys.length + 1) + " by sampling, which is expected to be at least " + + (int)(job.getNumReduceTasks() * minRatio)); + } + job.setNumReduceTasks(partitionKeys.length + 1); + } FileSystem fs = path.getFileSystem(job); SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, path, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index ef72039..98cf2a7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -371,7 +371,7 @@ public int execute(DriverContext driverContext) { Utilities.setMapRedWork(job, work, ctx.getMRTmpPath()); - if (mWork.getSamplingType() > 0 && rWork != null && rWork.getNumReduceTasks() > 1) { + if (mWork.getSamplingType() > 0 && rWork != null && job.getNumReduceTasks() > 1) { try { handleSampling(driverContext, mWork, job, conf); job.setPartitionerClass(HiveTotalOrderPartitioner.class); @@ -539,7 +539,7 @@ private void handleSampling(DriverContext context, MapWork mWork, JobConf job, H } else { throw new IllegalArgumentException("Invalid sampling type " + mWork.getSamplingType()); } - sampler.writePartitionKeys(partitionFile, job); + sampler.writePartitionKeys(partitionFile, conf, job); } /**