diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 74bb863..c02cd93 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.hive.conf.Validator.PatternSet; import org.apache.hadoop.hive.conf.Validator.RangeValidator; +import org.apache.hadoop.hive.conf.Validator.RatioValidator; import org.apache.hadoop.hive.conf.Validator.StringSet; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobConf; @@ -1029,9 +1030,11 @@ "When enabled dynamic partitioning column will be globally sorted.\n" + "This way we can keep only one record writer open for each partition value\n" + "in the reducer thereby reducing the memory pressure on reducers."), - HIVESAMPLINGFORORDERBY("hive.optimize.sampling.orderby", false, ""), - HIVESAMPLINGNUMBERFORORDERBY("hive.optimize.sampling.orderby.number", 1000, ""), - HIVESAMPLINGPERCENTFORORDERBY("hive.optimize.sampling.orderby.percent", 0.1f, ""), + + HIVESAMPLINGFORORDERBY("hive.optimize.sampling.orderby", false, "Uses sampling on order-by clause for parallel execution."), + HIVESAMPLINGNUMBERFORORDERBY("hive.optimize.sampling.orderby.number", 1000, "Total number of samples to be obtained."), + HIVESAMPLINGPERCENTFORORDERBY("hive.optimize.sampling.orderby.percent", 0.1f, new RatioValidator(), + "Probability with which a row will be chosen."), // whether to optimize union followed by select followed by filesink // It creates sub-directories in the final output, so should not be turned on in systems diff --git common/src/java/org/apache/hadoop/hive/conf/Validator.java common/src/java/org/apache/hadoop/hive/conf/Validator.java index cea9c41..14f9da3 100644 --- common/src/java/org/apache/hadoop/hive/conf/Validator.java +++ common/src/java/org/apache/hadoop/hive/conf/Validator.java @@ -147,7 +147,7 @@ public String validate(String value) { public String validate(String value) { try { float fvalue = Float.valueOf(value); - if (fvalue <= 0 || fvalue >= 1) { + if (fvalue < 0 || fvalue > 1) { return "Invalid ratio " + value + ", which should be in between 0 to 1"; } } catch (NumberFormatException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java index 6c22362..01a67e0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java @@ -20,24 +20,50 @@ package org.apache.hadoop.hive.ql.exec; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; -public class HiveTotalOrderPartitioner implements Partitioner { +public class HiveTotalOrderPartitioner implements Partitioner, Configurable { - private Partitioner partitioner - = new TotalOrderPartitioner(); + private static final Log LOG = LogFactory.getLog(HiveTotalOrderPartitioner.class); + private Partitioner partitioner; + + @Override public void configure(JobConf job) { - JobConf newconf = new JobConf(job); - newconf.setMapOutputKeyClass(BytesWritable.class); - partitioner.configure(newconf); + if (partitioner == null) { + configurePartitioner(new JobConf(job)); + } + } + + @Override + public void setConf(Configuration conf) { + // walk-around of TEZ-1403 + if (partitioner == null) { + configurePartitioner(new JobConf(conf)); + } } public int getPartition(HiveKey key, Object value, int numPartitions) { return partitioner.getPartition(key, value, numPartitions); } + + @Override + public Configuration getConf() { + return null; + } + + private void configurePartitioner(JobConf conf) { + LOG.info(TotalOrderPartitioner.getPartitionFile(conf)); + conf.setMapOutputKeyClass(BytesWritable.class); + partitioner = new TotalOrderPartitioner(); + partitioner.configure(conf); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java index 166461a..5d126a5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,6 +47,8 @@ public class PartitionKeySampler implements OutputCollector { + private static final Log LOG = LogFactory.getLog(PartitionKeySampler.class); + public static final Comparator C = new Comparator() { public final int compare(byte[] o1, byte[] o2) { return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, o2.length); @@ -74,32 +78,46 @@ public void collect(HiveKey key, Object value) throws IOException { } // sort and pick partition keys - // copied from org.apache.hadoop.mapred.lib.InputSampler + // originally copied from org.apache.hadoop.mapred.lib.InputSampler but seemed to have a bug private byte[][] getPartitionKeys(int numReduce) { if (sampled.size() < numReduce - 1) { throw new IllegalStateException("not enough number of sample"); } byte[][] sorted = sampled.toArray(new byte[sampled.size()][]); Arrays.sort(sorted, C); - byte[][] partitionKeys = new byte[numReduce - 1][]; - float stepSize = sorted.length / (float) numReduce; - int last = -1; - for(int i = 1; i < numReduce; ++i) { - int k = Math.round(stepSize * i); - while (last >= k && C.compare(sorted[last], sorted[k]) == 0) { - k++; + + return toPartitionKeys(sorted, numReduce); + } + + static final byte[][] toPartitionKeys(byte[][] sorted, int numPartition) { + byte[][] partitionKeys = new byte[numPartition - 1][]; + + int last = 0; + int current = 0; + for(int i = 0; i < numPartition - 1; i++) { + current += Math.round((float)(sorted.length - current) / (numPartition - i)); + while (i > 0 && current < sorted.length && C.compare(sorted[last], sorted[current]) == 0) { + current++; + } + if (current >= sorted.length) { + return Arrays.copyOfRange(partitionKeys, 0, i); } - if (k >= sorted.length) { - throw new IllegalStateException("not enough number of sample"); + if (LOG.isDebugEnabled()) { + // print out nth partition key for debugging + LOG.debug("Partition key " + current + "th :" + new BytesWritable(sorted[current])); } - partitionKeys[i - 1] = sorted[k]; - last = k; + partitionKeys[i] = sorted[current]; + last = current; } return partitionKeys; } - public void writePartitionKeys(Path path, JobConf job) throws IOException { + public void writePartitionKeys(Path path, HiveConf conf, JobConf job) throws IOException { byte[][] partitionKeys = getPartitionKeys(job.getNumReduceTasks()); + int numPartition = partitionKeys.length + 1; + if (numPartition != job.getNumReduceTasks()) { + job.setNumReduceTasks(numPartition); + } FileSystem fs = path.getFileSystem(job); SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, path, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index ef72039..98cf2a7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -371,7 +371,7 @@ public int execute(DriverContext driverContext) { Utilities.setMapRedWork(job, work, ctx.getMRTmpPath()); - if (mWork.getSamplingType() > 0 && rWork != null && rWork.getNumReduceTasks() > 1) { + if (mWork.getSamplingType() > 0 && rWork != null && job.getNumReduceTasks() > 1) { try { handleSampling(driverContext, mWork, job, conf); job.setPartitionerClass(HiveTotalOrderPartitioner.class); @@ -539,7 +539,7 @@ private void handleSampling(DriverContext context, MapWork mWork, JobConf job, H } else { throw new IllegalArgumentException("Invalid sampling type " + mWork.getSamplingType()); } - sampler.writePartitionKeys(partitionFile, job); + sampler.writePartitionKeys(partitionFile, conf, job); } /** diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestPartitionKeySampler.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestPartitionKeySampler.java new file mode 100644 index 0000000..e4634f4 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestPartitionKeySampler.java @@ -0,0 +1,52 @@ +package org.apache.hadoop.hive.ql.exec; + +import junit.framework.TestCase; + +import java.util.Arrays; + +public class TestPartitionKeySampler extends TestCase { + + private static final byte[] _100 = "100".getBytes(); + private static final byte[] _200 = "200".getBytes(); + private static final byte[] _300 = "300".getBytes(); + private static final byte[] _400 = "400".getBytes(); + + // current random sampling implementation in InputSampler always returns + // value of index 3, 5, 8, which can be same with previous partition key. + // That induces "Split points are out of order" exception in TotalOrderPartitioner causing HIVE-7699 + public void test() throws Throwable { + byte[][] sampled; + sampled = new byte[][] { + _100, _100, _100, _100, _100, _100, _100, _100, _100, _100 + }; + assertKeys(sampled, _100); // 3 + + sampled = new byte[][] { + _100, _100, _100, _100, _100, _100, _100, _100, _200, _200 + }; + assertKeys(sampled, _100, _200); // 3, 8 + + sampled = new byte[][] { + _100, _100, _100, _100 , _200, _200, _200, _300, _300, _300 + }; + assertKeys(sampled, _100, _200, _300); // 3, 5, 8 + + sampled = new byte[][] { + _100, _200, _200, _200, _200, _200, _200, _300, _300, _400 + }; + assertKeys(sampled, _200, _300, _400); // 3, 7, 9 + + sampled = new byte[][] { + _100, _200, _300, _400, _400, _400, _400, _400, _400, _400 + }; + assertKeys(sampled, _400); // 3 + } + + private void assertKeys(byte[][] sampled, byte[]... expected) { + byte[][] keys = PartitionKeySampler.toPartitionKeys(sampled, 4); + assertEquals(expected.length, keys.length); + for (int i = 0; i < expected.length; i++) { + assertTrue(Arrays.equals(expected[i], keys[i])); + } + } +}