diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7b3acad511..a804fe3c21 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4652,6 +4652,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "If LLAP external clients submits ORDER BY queries, force return a single split to guarantee reading\n" + "data out in ordered way. Setting this to false will let external clients read data out in parallel\n" + "losing the ordering (external clients are responsible for guaranteeing the ordering)"), + LLAP_EXTERNAL_SPLITS_LIMIT_FORCE_SINGLE_SPLIT("hive.llap.external.splits.limit.force.single.split", true, + "If LLAP external clients submits LIMIT queries, force return a single split to guarantee reading\n" + + "data as set in limit. Setting this to false will let external clients read data out in parallel\n" + + "and might end up reading more data"), LLAP_EXTERNAL_CLIENT_USE_HYBRID_CALENDAR("hive.llap.external.client.use.hybrid.calendar", false, "Whether to use hybrid calendar for parsing of data/timestamps."), diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java index 8cbca69737..11b82ca34f 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java @@ -164,6 +164,9 @@ protected void testGenericUDTFOrderBySplitCount1(String udtfName, int[] expected query = "select " + udtfName + "(" + "'select value from " + tableName + " order by under_col limit 0', 5)"; runQuery(query, getConfigs(), expectedCounts[2]); + query = "select get_splits(" + "'select value from " + tableName + " limit 2', 5)"; + runQuery(query, getConfigs(), 1); + query = "select " + udtfName + "(" + "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)"; runQuery(query, getConfigs(), expectedCounts[3]); @@ -173,5 +176,9 @@ protected void testGenericUDTFOrderBySplitCount1(String udtfName, int[] expected query = "select " + udtfName + "(" + "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)"; runQuery(query, setCmds, expectedCounts[4]); + + setCmds.add("set hive.llap.external.splits.limit.force.single.split=false"); + query = "select get_splits('select `value` from (select value from " + tableName + " limit 3) as t', 5)"; + runQuery(query, setCmds, 3); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 00a6c89b1e..b3a0062c64 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -133,6 +133,7 @@ protected transient IntObjectInspector intOI; protected transient JobConf jc; private boolean orderByQuery; + private boolean limitQuery; private boolean forceSingleSplit; protected ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); protected DataOutput dos = new DataOutputStream(bos); @@ -237,7 +238,7 @@ protected SplitResult getSplitResult(boolean generateLightWeightSplits) TezWork tezWork = fragment.work; Schema schema = fragment.schema; - boolean generateSingleSplit = forceSingleSplit && orderByQuery; + boolean generateSingleSplit = forceSingleSplit && (orderByQuery || limitQuery); SplitResult splitResult = getSplits(jc, tezWork, schema, applicationId, generateSingleSplit, generateLightWeightSplits); @@ -253,8 +254,8 @@ private void validateSplitResult(SplitResult splitResult, boolean generateLightW if (splits.length > 0 && generateLightWeightSplits) { Preconditions.checkNotNull(splitResult.planSplit, "plan split cannot be null"); } - LOG.info("Generated {} splits for query {}. orderByQuery: {} forceSingleSplit: {}", splits.length, inputArgQuery, - orderByQuery, forceSingleSplit); + LOG.info("Generated {} splits for query {}. orderByQuery: {} limitQuery: {} forceSingleSplit: {}", splits.length, inputArgQuery, + orderByQuery, limitQuery, forceSingleSplit); if (generateSingleSplit && splits.length > 1) { throw new HiveException("Got more than one split (Got: " + splits.length + ") for order by query: " + inputArgQuery); @@ -318,8 +319,11 @@ private PlanFragment createPlanFragment(String query, ApplicationId splitsAppId) QueryPlan plan = driver.getPlan(); orderByQuery = plan.getQueryProperties().hasOrderBy() || plan.getQueryProperties().hasOuterOrderBy(); + limitQuery = plan.getQueryProperties().getOuterQueryLimit() != -1 ; forceSingleSplit = orderByQuery && HiveConf.getBoolVar(conf, ConfVars.LLAP_EXTERNAL_SPLITS_ORDER_BY_FORCE_SINGLE_SPLIT); + forceSingleSplit = forceSingleSplit || (limitQuery && + HiveConf.getBoolVar(conf, ConfVars.LLAP_EXTERNAL_SPLITS_LIMIT_FORCE_SINGLE_SPLIT)); List> roots = plan.getRootTasks(); Schema schema = convertSchema(plan.getResultSchema()); boolean fetchTask = plan.getFetchTask() != null; @@ -335,8 +339,12 @@ private PlanFragment createPlanFragment(String query, ApplicationId splitsAppId) tezWork = ((TezTask) roots.get(0)).getWork(); } - if (tezWork == null || tezWork.getAllWork().size() != 1) { - + // even though a simple limit query (select * from table limit 1) generates only mapper work (no reduce phase), + // a single split is required to read the data. + // Directly forcing single split will not work, as the table might contain multiple files and + // single split requires a single file to be read from (HiveSplitsGenerator requires single file). + // Therefore, a limit query needs to be materialized + if (tezWork == null || tezWork.getAllWork().size() != 1 || limitQuery) { String tableName = "table_" + UUID.randomUUID().toString().replaceAll("-", ""); String storageFormatString = getTempTableStorageFormatString(conf);