diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java index 8cbca69737..ca8977af09 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java @@ -42,6 +42,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; /** * AbstractTestJdbcGenericUDTFGetSplits. @@ -50,6 +51,7 @@ protected static MiniHS2 miniHS2 = null; protected static String dataFileDir; protected static String tableName = "testtab1"; + protected static String partitionedTableName = "partitionedtesttab1"; protected static HiveConf conf = null; static Path kvDataFilePath; protected Connection hs2Conn = null; @@ -164,14 +166,77 @@ protected void testGenericUDTFOrderBySplitCount1(String udtfName, int[] expected query = "select " + udtfName + "(" + "'select value from " + tableName + " order by under_col limit 0', 5)"; runQuery(query, getConfigs(), expectedCounts[2]); + query = "select " + udtfName + "(" + "'select value from " + tableName + " limit 2', 5)"; + runQuery(query, getConfigs(), expectedCounts[3]); + + query = "select " + udtfName + "(" + "'select value from " + tableName + " group by value limit 2', 5)"; + runQuery(query, getConfigs(), expectedCounts[4]); + + query = "select " + udtfName + "(" + "'select value from " + tableName + " where value is not null limit 2', 5)"; + runQuery(query, getConfigs(), expectedCounts[5]); + query = "select " + udtfName + "(" + "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)"; - runQuery(query, getConfigs(), expectedCounts[3]); + runQuery(query, getConfigs(), expectedCounts[6]); List setCmds = getConfigs(); setCmds.add("set hive.llap.external.splits.order.by.force.single.split=false"); query = "select " + udtfName + "(" + "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)"; - runQuery(query, setCmds, expectedCounts[4]); + runQuery(query, setCmds, expectedCounts[7]); + } + + protected void testGenericUDTFOrderBySplitCount1OnPartitionedTable(String udtfName, int[] expectedCounts) throws Exception { + createPartitionedTestTable(null, partitionedTableName); + + String query = "select " + udtfName + "(" + "'select id from " + partitionedTableName + "', 5)"; + runQuery(query, getConfigs(), expectedCounts[0]); + + query = "select " + udtfName + "(" + "'select id from " + partitionedTableName + " order by id', 5)"; + runQuery(query, getConfigs(), expectedCounts[1]); + + query = "select " + udtfName + "(" + "'select id from " + partitionedTableName + " limit 2', 5)"; + runQuery(query, getConfigs(), expectedCounts[1]); + + query = "select " + udtfName + "(" + "'select id from " + partitionedTableName + " where id != 0 limit 2', 5)"; + runQuery(query, getConfigs(), expectedCounts[1]); + + query = "select " + udtfName + "(" + "'select id from " + partitionedTableName + " group by id limit 2', 5)"; + runQuery(query, getConfigs(), expectedCounts[1]); + + } + + private void createPartitionedTestTable(String database, String tableName) throws Exception { + Statement stmt = hs2Conn.createStatement(); + + if (database != null) { + stmt.execute("CREATE DATABASE IF NOT EXISTS " + database); + stmt.execute("USE " + database); + } + + // create table + stmt.execute("DROP TABLE IF EXISTS " + tableName); + stmt.execute("CREATE TABLE " + tableName + + " (id INT) partitioned by (p1 int)"); + + // load data + for (int i=1; i<=5; i++) { + String values = ""; + for (int j=1; j<=10; j++) { + if (j != 10) { + values+= "(" + j +"),"; + } else { + values+= "(" + j +")"; + } + } + stmt.execute("insert into " + tableName + " partition (p1=" + i +") " + " values " + values); + } + + + ResultSet res = stmt.executeQuery("SELECT count(*) FROM " + tableName); + assertTrue(res.next()); + assertEquals(50, res.getInt(1)); + res.close(); + stmt.close(); } } diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java index defbe78802..fccf5ed4dd 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java @@ -38,9 +38,15 @@ @Test(timeout = 200000) public void testGenericUDTFOrderBySplitCount1() throws Exception { - super.testGenericUDTFOrderBySplitCount1("get_splits", new int[]{10, 1, 0, 1, 10}); + super.testGenericUDTFOrderBySplitCount1("get_splits", new int[]{10, 1, 0, 2, 2, 2, 1, 10}); } + @Test(timeout = 200000) + public void testGenericUDTFOrderBySplitCount1OnPartitionedTable() throws Exception { + super.testGenericUDTFOrderBySplitCount1OnPartitionedTable("get_splits", new int[]{10, 1, 2, 2, 2}); + } + + @Test public void testDecimalPrecisionAndScale() throws Exception { try (Statement stmt = hs2Conn.createStatement()) { diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits2.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits2.java index 330174513c..d296d56002 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits2.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits2.java @@ -25,7 +25,12 @@ @Test(timeout = 200000) public void testGenericUDTFOrderBySplitCount1() throws Exception { - super.testGenericUDTFOrderBySplitCount1("get_llap_splits", new int[]{12, 3, 1, 3, 12}); + super.testGenericUDTFOrderBySplitCount1("get_llap_splits", new int[]{12, 3, 1, 4, 4, 4, 3, 12}); + } + + @Test(timeout = 200000) + public void testGenericUDTFOrderBySplitCount1OnPartitionedTable() throws Exception { + super.testGenericUDTFOrderBySplitCount1OnPartitionedTable("get_llap_splits", new int[]{12, 3, 4, 4, 4}); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 00a6c89b1e..7682e1f5c2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -133,6 +133,7 @@ protected transient IntObjectInspector intOI; protected transient JobConf jc; private boolean orderByQuery; + private boolean limitQuery; private boolean forceSingleSplit; protected ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); protected DataOutput dos = new DataOutputStream(bos); @@ -318,6 +319,7 @@ private PlanFragment createPlanFragment(String query, ApplicationId splitsAppId) QueryPlan plan = driver.getPlan(); orderByQuery = plan.getQueryProperties().hasOrderBy() || plan.getQueryProperties().hasOuterOrderBy(); + limitQuery = plan.getQueryProperties().getOuterQueryLimit() != -1; forceSingleSplit = orderByQuery && HiveConf.getBoolVar(conf, ConfVars.LLAP_EXTERNAL_SPLITS_ORDER_BY_FORCE_SINGLE_SPLIT); List> roots = plan.getRootTasks(); @@ -334,9 +336,10 @@ private PlanFragment createPlanFragment(String query, ApplicationId splitsAppId) } else { tezWork = ((TezTask) roots.get(0)).getWork(); } - - if (tezWork == null || tezWork.getAllWork().size() != 1) { - + // A simple limit query (select * from table limit n) generates only mapper work (no reduce phase). + // This can create multiple splits ignoring limit constraint, and multiple llap daemons working on those splits + // return more than "n" rows. Therefore, a limit query needs to be materialized. + if (tezWork == null || tezWork.getAllWork().size() != 1 || limitQuery) { String tableName = "table_" + UUID.randomUUID().toString().replaceAll("-", ""); String storageFormatString = getTempTableStorageFormatString(conf);