diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTotalOrderPartitioner.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTotalOrderPartitioner.java index c66d964f24..4596c02806 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTotalOrderPartitioner.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTotalOrderPartitioner.java @@ -35,52 +35,52 @@ public class TezTotalOrderPartitioner implements Partitioner, Configurable { - private static final Logger LOG = LoggerFactory.getLogger(TezTotalOrderPartitioner.class); + private static final Logger LOG = LoggerFactory.getLogger(TezTotalOrderPartitioner.class); - private Partitioner partitioner; + private Partitioner partitioner; - private static final String TEZ_RUNTIME_FRAMEWORK_PREFIX = "tez.runtime.framework."; - public static final String TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS = TEZ_RUNTIME_FRAMEWORK_PREFIX - + "num.expected.partitions"; + private static final String TEZ_RUNTIME_FRAMEWORK_PREFIX = "tez.runtime.framework."; + public static final String TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS = + TEZ_RUNTIME_FRAMEWORK_PREFIX + "num.expected.partitions"; - @Override - public void configure(JobConf job) { - if (partitioner == null) { - configurePartitioner(new JobConf(job)); - } - } + @Override + public void configure(JobConf job) { + if (partitioner == null) { + configurePartitioner(new JobConf(job)); + } + } - @Override - public void setConf(Configuration conf) { - // walk-around of TEZ-1403 - if (partitioner == null) { - configurePartitioner(new JobConf(conf)); - } - } + @Override + public void setConf(Configuration conf) { + // walk-around of TEZ-1403 + if (partitioner == null) { + configurePartitioner(new JobConf(conf)); + } + } - public int getPartition(HiveKey key, Object value, int numPartitions) { - return partitioner.getPartition(key, value, numPartitions); - } + public int getPartition(HiveKey key, Object value, int numPartitions) { + return partitioner.getPartition(key, value, numPartitions); + } - @Override - public Configuration getConf() { - return null; - } + @Override + public Configuration getConf() { + return null; + } - private void configurePartitioner(JobConf conf) { - LOG.info(TotalOrderPartitioner.getPartitionFile(conf)); - // make the HiveKey assumption - conf.setMapOutputKeyClass(HiveKey.class); - LOG.info(conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); - // remove the Tez fast serialization factory (TEZ-1288) - // this one skips the len prefix, so that the sorter can assume byte-order == - // sort-order - conf.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName(), - WritableSerialization.class.getName()); - int partitions = conf.getInt(TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, -1); - // get the tez partitioning and feed it into the MR config - conf.setInt(MRJobConfig.NUM_REDUCES, partitions); - partitioner = new TotalOrderPartitioner(); - partitioner.configure(conf); - } + private void configurePartitioner(JobConf conf) { + LOG.info(TotalOrderPartitioner.getPartitionFile(conf)); + // make the HiveKey assumption + conf.setMapOutputKeyClass(HiveKey.class); + LOG.info(conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); + // remove the Tez fast serialization factory (TEZ-1288) + // this one skips the len prefix, so that the sorter can assume byte-order == + // sort-order + conf.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, + JavaSerialization.class.getName(), WritableSerialization.class.getName()); + int partitions = conf.getInt(TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, -1); + // get the tez partitioning and feed it into the MR config + conf.setInt(MRJobConfig.NUM_REDUCES, partitions); + partitioner = new TotalOrderPartitioner(); + partitioner.configure(conf); + } } \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTotalOrderPartitioner.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTotalOrderPartitioner.java index 363487ed89..cc16fd40a4 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTotalOrderPartitioner.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTotalOrderPartitioner.java @@ -48,159 +48,164 @@ public class TestTezTotalOrderPartitioner { - public static final String PARTITIONER_PATH = "mapreduce.totalorderpartitioner.path"; - private static final String TEZ_RUNTIME_FRAMEWORK_PREFIX = "tez.runtime.framework."; - - public static final String TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS = TEZ_RUNTIME_FRAMEWORK_PREFIX + "num.expected.partitions"; - - private static final int LENGTH_BYTES = 4; - - private static final HiveKey[] splitStrings = new HiveKey[] { - // -inf // 0 - new HiveKey("aabbb".getBytes()), // 1 - new HiveKey("babbb".getBytes()), // 2 - new HiveKey("daddd".getBytes()), // 3 - new HiveKey("dddee".getBytes()), // 4 - new HiveKey("ddhee".getBytes()), // 5 - new HiveKey("dingo".getBytes()), // 6 - new HiveKey("hijjj".getBytes()), // 7 - new HiveKey("n".getBytes()), // 8 - new HiveKey("yak".getBytes()), // 9 - }; - - static class Check { - T data; - int part; - - Check(T data, int part) { - this.data = data; - this.part = part; - } - } - - private static final ArrayList> testStrings = new ArrayList>(); - static { - testStrings.add(new Check(new HiveKey("aaaaa".getBytes()), 0)); - testStrings.add(new Check(new HiveKey("aaabb".getBytes()), 0)); - testStrings.add(new Check(new HiveKey("aabbb".getBytes()), 1)); - testStrings.add(new Check(new HiveKey("aaaaa".getBytes()), 0)); - testStrings.add(new Check(new HiveKey("babbb".getBytes()), 2)); - testStrings.add(new Check(new HiveKey("baabb".getBytes()), 1)); - testStrings.add(new Check(new HiveKey("yai".getBytes()), 8)); - testStrings.add(new Check(new HiveKey("yak".getBytes()), 9)); - testStrings.add(new Check(new HiveKey("z".getBytes()), 9)); - testStrings.add(new Check(new HiveKey("ddngo".getBytes()), 5)); - testStrings.add(new Check(new HiveKey("hi".getBytes()), 6)); - }; - - private static Path writePartitionFile(String testname, Configuration conf, T[] splits) throws IOException { - final FileSystem fs = FileSystem.getLocal(conf); - final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(fs.getUri(), - fs.getWorkingDirectory()); - Path p = new Path(testdir, testname + "/_partition.lst"); - conf.set(PARTITIONER_PATH, p.toString()); - conf.setInt(TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, splits.length + 1); - SequenceFile.Writer w = null; - try { - w = SequenceFile.createWriter(conf, SequenceFile.Writer.file(p), - SequenceFile.Writer.keyClass(HiveKey.class), SequenceFile.Writer.valueClass(NullWritable.class), - SequenceFile.Writer.compression(CompressionType.NONE)); - for (int i = 0; i < splits.length; ++i) { - w.append(splits[i], NullWritable.get()); - } - } finally { - if (null != w) - w.close(); - } - return p; - } - - @Test - public void testTotalOrderMemCmp() throws Exception { - TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner(); - Configuration conf = new Configuration(); - Path p = TestTezTotalOrderPartitioner.writePartitionFile("totalordermemcmp", conf, splitStrings); - try { - partitioner.configure(new JobConf(conf)); - NullWritable nw = NullWritable.get(); - for (Check chk : testStrings) { - assertEquals(chk.data.toString(), chk.part, - partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); - } - } finally { - p.getFileSystem(conf).delete(p, true); - } - } - - @Test - public void testTotalOrderBinarySearch() throws Exception { - TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner(); - Configuration conf = new Configuration(); - Path p = TestTezTotalOrderPartitioner.writePartitionFile("totalorderbinarysearch", conf, splitStrings); - conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false); - - try { - partitioner.configure(new JobConf(conf)); - NullWritable nw = NullWritable.get(); - for (Check chk : testStrings) { - assertEquals(chk.data.toString(), chk.part, - partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); - } - } finally { - p.getFileSystem(conf).delete(p, true); - } - } - - - /** A Comparator optimized for HiveKey. */ - public static class ReverseHiveKeyComparator implements RawComparator { - - /** - * Compare the buffers in serialized form. - */ - @Override - public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { - int c = -1*WritableComparator.compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2 + LENGTH_BYTES, l2 - LENGTH_BYTES); - return c; - } - - @Override - public int compare(HiveKey o1, HiveKey o2) { - return -o1.compareTo(o2); - } - } - - @Test - public void testTotalOrderCustomComparator() throws Exception { - TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner(); - Configuration conf = new Configuration(); - HiveKey[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length); - Arrays.sort(revSplitStrings, new ReverseHiveKeyComparator()); - Path p = TestTezTotalOrderPartitioner.writePartitionFile("totalordercustomcomparator", conf, - revSplitStrings); - conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false); - conf.setClass(MRJobConfig.KEY_COMPARATOR, ReverseHiveKeyComparator.class, RawComparator.class); - ArrayList> revCheck = new ArrayList>(); - revCheck.add(new Check(new HiveKey("aaaaa".getBytes()), 9)); - revCheck.add(new Check(new HiveKey("aaabb".getBytes()), 9)); - revCheck.add(new Check(new HiveKey("aabbb".getBytes()), 9)); - revCheck.add(new Check(new HiveKey("aaaaa".getBytes()), 9)); - revCheck.add(new Check(new HiveKey("babbb".getBytes()), 8)); - revCheck.add(new Check(new HiveKey("baabb".getBytes()), 8)); - revCheck.add(new Check(new HiveKey("yai".getBytes()), 1)); - revCheck.add(new Check(new HiveKey("yak".getBytes()), 1)); - revCheck.add(new Check(new HiveKey("z".getBytes()), 0)); - revCheck.add(new Check(new HiveKey("ddngo".getBytes()), 4)); - revCheck.add(new Check(new HiveKey("hi".getBytes()), 3)); - try { - partitioner.configure(new JobConf(conf)); - NullWritable nw = NullWritable.get(); - for (Check chk : revCheck) { - assertEquals(chk.data.toString(), chk.part, - partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); - } - } finally { - p.getFileSystem(conf).delete(p, true); - } - } + public static final String PARTITIONER_PATH = "mapreduce.totalorderpartitioner.path"; + private static final String TEZ_RUNTIME_FRAMEWORK_PREFIX = "tez.runtime.framework."; + + public static final String TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS = + TEZ_RUNTIME_FRAMEWORK_PREFIX + "num.expected.partitions"; + + private static final int LENGTH_BYTES = 4; + + private static final HiveKey[] splitStrings = new HiveKey[] { + // -inf // 0 + new HiveKey("aabbb".getBytes()), // 1 + new HiveKey("babbb".getBytes()), // 2 + new HiveKey("daddd".getBytes()), // 3 + new HiveKey("dddee".getBytes()), // 4 + new HiveKey("ddhee".getBytes()), // 5 + new HiveKey("dingo".getBytes()), // 6 + new HiveKey("hijjj".getBytes()), // 7 + new HiveKey("n".getBytes()), // 8 + new HiveKey("yak".getBytes()), // 9 + }; + + static class Check { + T data; + int part; + + Check(T data, int part) { + this.data = data; + this.part = part; + } + } + + private static final ArrayList> testStrings = new ArrayList>(); + static { + testStrings.add(new Check(new HiveKey("aaaaa".getBytes()), 0)); + testStrings.add(new Check(new HiveKey("aaabb".getBytes()), 0)); + testStrings.add(new Check(new HiveKey("aabbb".getBytes()), 1)); + testStrings.add(new Check(new HiveKey("aaaaa".getBytes()), 0)); + testStrings.add(new Check(new HiveKey("babbb".getBytes()), 2)); + testStrings.add(new Check(new HiveKey("baabb".getBytes()), 1)); + testStrings.add(new Check(new HiveKey("yai".getBytes()), 8)); + testStrings.add(new Check(new HiveKey("yak".getBytes()), 9)); + testStrings.add(new Check(new HiveKey("z".getBytes()), 9)); + testStrings.add(new Check(new HiveKey("ddngo".getBytes()), 5)); + testStrings.add(new Check(new HiveKey("hi".getBytes()), 6)); + }; + + private static Path writePartitionFile(String testname, Configuration conf, T[] splits) + throws IOException { + final FileSystem fs = FileSystem.getLocal(conf); + final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")) + .makeQualified(fs.getUri(), fs.getWorkingDirectory()); + Path p = new Path(testdir, testname + "/_partition.lst"); + conf.set(PARTITIONER_PATH, p.toString()); + conf.setInt(TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, splits.length + 1); + SequenceFile.Writer w = null; + try { + w = SequenceFile.createWriter(conf, SequenceFile.Writer.file(p), + SequenceFile.Writer.keyClass(HiveKey.class), + SequenceFile.Writer.valueClass(NullWritable.class), + SequenceFile.Writer.compression(CompressionType.NONE)); + for (int i = 0; i < splits.length; ++i) { + w.append(splits[i], NullWritable.get()); + } + } finally { + if (null != w) + w.close(); + } + return p; + } + + @Test + public void testTotalOrderMemCmp() throws Exception { + TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner(); + Configuration conf = new Configuration(); + Path p = TestTezTotalOrderPartitioner. writePartitionFile("totalordermemcmp", conf, + splitStrings); + try { + partitioner.configure(new JobConf(conf)); + NullWritable nw = NullWritable.get(); + for (Check chk : testStrings) { + assertEquals(chk.data.toString(), chk.part, + partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); + } + } finally { + p.getFileSystem(conf).delete(p, true); + } + } + + @Test + public void testTotalOrderBinarySearch() throws Exception { + TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner(); + Configuration conf = new Configuration(); + Path p = TestTezTotalOrderPartitioner. writePartitionFile("totalorderbinarysearch", + conf, splitStrings); + conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false); + + try { + partitioner.configure(new JobConf(conf)); + NullWritable nw = NullWritable.get(); + for (Check chk : testStrings) { + assertEquals(chk.data.toString(), chk.part, + partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); + } + } finally { + p.getFileSystem(conf).delete(p, true); + } + } + + /** A Comparator optimized for HiveKey. */ + public static class ReverseHiveKeyComparator implements RawComparator { + + /** + * Compare the buffers in serialized form. + */ + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + int c = -1 * WritableComparator.compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, + s2 + LENGTH_BYTES, l2 - LENGTH_BYTES); + return c; + } + + @Override + public int compare(HiveKey o1, HiveKey o2) { + return -o1.compareTo(o2); + } + } + + @Test + public void testTotalOrderCustomComparator() throws Exception { + TezTotalOrderPartitioner partitioner = new TezTotalOrderPartitioner(); + Configuration conf = new Configuration(); + HiveKey[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length); + Arrays.sort(revSplitStrings, new ReverseHiveKeyComparator()); + Path p = TestTezTotalOrderPartitioner. writePartitionFile("totalordercustomcomparator", + conf, revSplitStrings); + conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false); + conf.setClass(MRJobConfig.KEY_COMPARATOR, ReverseHiveKeyComparator.class, RawComparator.class); + ArrayList> revCheck = new ArrayList>(); + revCheck.add(new Check(new HiveKey("aaaaa".getBytes()), 9)); + revCheck.add(new Check(new HiveKey("aaabb".getBytes()), 9)); + revCheck.add(new Check(new HiveKey("aabbb".getBytes()), 9)); + revCheck.add(new Check(new HiveKey("aaaaa".getBytes()), 9)); + revCheck.add(new Check(new HiveKey("babbb".getBytes()), 8)); + revCheck.add(new Check(new HiveKey("baabb".getBytes()), 8)); + revCheck.add(new Check(new HiveKey("yai".getBytes()), 1)); + revCheck.add(new Check(new HiveKey("yak".getBytes()), 1)); + revCheck.add(new Check(new HiveKey("z".getBytes()), 0)); + revCheck.add(new Check(new HiveKey("ddngo".getBytes()), 4)); + revCheck.add(new Check(new HiveKey("hi".getBytes()), 3)); + try { + partitioner.configure(new JobConf(conf)); + NullWritable nw = NullWritable.get(); + for (Check chk : revCheck) { + assertEquals(chk.data.toString(), chk.part, + partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); + } + } finally { + p.getFileSystem(conf).delete(p, true); + } + } } \ No newline at end of file