diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java index c8a428c323..a751211373 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java @@ -118,9 +118,13 @@ public static void afterTest() throws Exception { @Test(timeout = 200000) public void testGenericUDTFOrderBySplitCount1() throws Exception { - String query = "select get_splits(" + "'select value from " + tableName + "', 5)"; + String query = "select get_splits(" + "'select value from " + tableName + "', 10)"; runQuery(query, getConfigs(), 10); + // Check number of splits is respected + query = "select get_splits(" + "'select value from " + tableName + "', 3)"; + runQuery(query, getConfigs(), 3); + query = "select get_splits(" + "'select value from " + tableName + " order by under_col', 5)"; runQuery(query, getConfigs(), 1); @@ -131,7 +135,7 @@ public void testGenericUDTFOrderBySplitCount1() throws Exception { List setCmds = getConfigs(); setCmds.add("set hive.llap.external.splits.order.by.force.single.split=false"); query = "select get_splits(" + - "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)"; + "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 10)"; runQuery(query, setCmds, 10); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 6daa8df63d..f1946729aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -25,6 +25,7 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.OptionalInt; import java.util.Set; import com.google.common.base.Preconditions; @@ -91,9 +92,16 @@ private final MapWork work; private final SplitGrouper splitGrouper = new SplitGrouper(); private final SplitLocationProvider splitLocationProvider; + private final OptionalInt numSplits; + private boolean generateSingleSplit; public HiveSplitGenerator(Configuration conf, MapWork work, final boolean generateSingleSplit) throws IOException { + this(conf, work, generateSingleSplit, null); + } + + public HiveSplitGenerator(Configuration conf, MapWork work, final boolean generateSingleSplit, Integer numSplits) + throws IOException { super(null); this.conf = conf; @@ -117,6 +125,11 @@ public HiveSplitGenerator(Configuration conf, MapWork work, final boolean genera // initialized, which may cause it to drop events. // No dynamic partition pruning pruner = null; + if (numSplits == null) { + this.numSplits = OptionalInt.empty(); + } else { + this.numSplits = OptionalInt.of(numSplits); + } } public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOException, @@ -145,7 +158,7 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE // Setting it up in initialize leads to a window where events may come in before the pruner is // initialized, which may cause it to drop events. pruner = new DynamicPartitionPruner(initializerContext, work, jobConf); - + this.numSplits = OptionalInt.empty(); } @SuppressWarnings("unchecked") @@ -201,10 +214,16 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE LOG.info("The preferred split size is " + preferredSplitSize); } + float waves; // Create the un-grouped splits - float waves = - conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES, - TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); + if (numSplits.isPresent()) { + waves = (float)numSplits.getAsInt() / availableSlots; + } else { + waves = + conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES, + TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); + + } InputSplit[] splits; if (generateSingleSplit && @@ -232,7 +251,7 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE splits[0] = new HiveInputFormat.HiveInputSplit(fileSplit, partIF); } else { // Raw splits - splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves)); + splits = inputFormat.getSplits(jobConf, numSplits.orElse((int) (availableSlots * waves))); } // Sort the splits, so that subsequent grouping is consistent. Arrays.sort(splits, new InputSplitComparator()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index a29b560453..06361da77c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -416,7 +416,8 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli Preconditions.checkState(HiveConf.getBoolVar(wxConf, ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS)); - HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork, generateSingleSplit); + HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork, + generateSingleSplit, numSplits); List eventList = splitGenerator.initialize(); InputSplit[] result = new InputSplit[eventList.size() - 1];