diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java new file mode 100644 index 0000000000..02dfe7c644 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.PrintStream; +import java.net.URL; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * AbstractTestJdbcGenericUDTFGetSplits. + */ +public abstract class AbstractTestJdbcGenericUDTFGetSplits { + protected static MiniHS2 miniHS2 = null; + protected static String dataFileDir; + protected static String tableName = "testtab1"; + protected static HiveConf conf = null; + static Path kvDataFilePath; + protected Connection hs2Conn = null; + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + + String confDir = "../../data/conf/llap/"; + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + + conf = new HiveConf(); + conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default"); + conf.setTimeVar(HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100, TimeUnit.MILLISECONDS); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, true); + conf.setBoolVar(HiveConf.ConfVars.TEZ_EXEC_SUMMARY, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); + conf.setVar(HiveConf.ConfVars.LLAP_IO_MEMORY_MODE, "none"); + conf.setVar(HiveConf.ConfVars.LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT, "text"); + + + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + + miniHS2 = new MiniHS2(conf, MiniHS2.MiniClusterType.LLAP); + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + + Map confOverlay = new HashMap<>(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + } + + @AfterClass + public static void afterTest() throws Exception { + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + } + + @Before + public void setUp() throws Exception { + hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); + } + + @After + public void tearDown() throws Exception { + LlapBaseInputFormat.closeAll(); + hs2Conn.close(); + } + + protected void runQuery(final String query, final List setCmds, + final int numRows) throws Exception { + + Connection con = hs2Conn; + BaseJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString()); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setErr(new PrintStream(baos)); // capture stderr + final Statement selStmt = con.createStatement(); + Throwable throwable = null; + int rowCount = 0; + try { + try { + if (setCmds != null) { + for (String setCmd : setCmds) { + selStmt.execute(setCmd); + } + } + ResultSet resultSet = selStmt.executeQuery(query); + while (resultSet.next()) { + rowCount++; + } + } catch (SQLException e) { + throwable = e; + } + selStmt.close(); + assertNull(throwable); + System.out.println("Expected " + numRows + " rows for query '" + query + "'. Got: " + rowCount); + assertEquals("Expected rows: " + numRows + " got: " + rowCount, numRows, rowCount); + } finally { + baos.close(); + } + + } + + protected List getConfigs(String... more) { + List setCmds = new ArrayList<>(); + setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict"); + setCmds.add("set mapred.min.split.size=10"); + setCmds.add("set mapred.max.split.size=10"); + setCmds.add("set tez.grouping.min-size=10"); + setCmds.add("set tez.grouping.max-size=10"); + // to get at least 10 splits + setCmds.add("set tez.grouping.split-waves=10"); + if (more != null) { + setCmds.addAll(Arrays.asList(more)); + } + return setCmds; + } + + protected void testGenericUDTFOrderBySplitCount1(String udtfName, int[] expectedCounts) throws Exception { + String query = "select " + udtfName + "(" + "'select value from " + tableName + "', 5)"; + runQuery(query, getConfigs(), expectedCounts[0]); + + query = "select " + udtfName + "(" + "'select value from " + tableName + " order by under_col', 5)"; + runQuery(query, getConfigs(), expectedCounts[1]); + + query = "select " + udtfName + "(" + "'select value from " + tableName + " order by under_col limit 0', 5)"; + runQuery(query, getConfigs(), expectedCounts[2]); + + query = "select " + udtfName + "(" + + "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)"; + runQuery(query, getConfigs(), expectedCounts[3]); + + List setCmds = getConfigs(); + setCmds.add("set hive.llap.external.splits.order.by.force.single.split=false"); + query = "select " + udtfName + "(" + + "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)"; + runQuery(query, setCmds, expectedCounts[4]); + } +} diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java index 8467cea58c..f4e9f9a87c 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java @@ -101,11 +101,12 @@ * by sub-classes in a {@link org.junit.BeforeClass} initializer */ public abstract class BaseJdbcWithMiniLlap { - private static MiniHS2 miniHS2 = null; + private static String dataFileDir; private static Path kvDataFilePath; private static Path dataTypesFilePath; + protected static MiniHS2 miniHS2 = null; protected static HiveConf conf = null; protected static Connection hs2Conn = null; @@ -456,7 +457,7 @@ public void testComplexQuery() throws Exception { assertArrayEquals(new String[] {"val_0", "3"}, rowCollector.rows.get(0)); } - private interface RowProcessor { + protected interface RowProcessor { void process(Row row); } @@ -506,7 +507,8 @@ protected int processQuery(String query, int numSplits, RowProcessor rowProcesso protected abstract InputFormat getInputFormat(); - private int processQuery(String currentDatabase, String query, int numSplits, RowProcessor rowProcessor) throws Exception { + protected int processQuery(String currentDatabase, String query, int numSplits, RowProcessor rowProcessor) + throws Exception { String url = miniHS2.getJdbcURL(); String user = System.getProperty("user.name"); String pwd = user; diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java index f6f64b8b6c..7eae613437 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java @@ -16,125 +16,29 @@ package org.apache.hive.jdbc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.PrintStream; -import java.net.URL; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.FieldDesc; import org.apache.hadoop.hive.llap.LlapBaseInputFormat; import org.apache.hadoop.hive.llap.LlapInputSplit; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.mapred.JobConf; -import org.apache.hive.jdbc.miniHS2.MiniHS2; -import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; -import com.google.common.collect.Lists; - -public class TestJdbcGenericUDTFGetSplits { - protected static MiniHS2 miniHS2 = null; - protected static String dataFileDir; - static Path kvDataFilePath; - protected static String tableName = "testtab1"; - - protected static HiveConf conf = null; - protected Connection hs2Conn = null; - - @BeforeClass - public static void beforeTest() throws Exception { - Class.forName(MiniHS2.getJdbcDriverName()); - - String confDir = "../../data/conf/llap/"; - HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); - System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); - - conf = new HiveConf(); - conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); - conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); - conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); - conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default"); - conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100, TimeUnit.MILLISECONDS); - conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, true); - conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); - conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); - conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); - conf.setVar(ConfVars.LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT, "text"); - - - conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() - + "/tez-site.xml")); - - miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); - dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); - kvDataFilePath = new Path(dataFileDir, "kv1.txt"); - - Map confOverlay = new HashMap<>(); - miniHS2.start(confOverlay); - miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); - } - - @Before - public void setUp() throws Exception { - hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); - } +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.UUID; - @After - public void tearDown() throws Exception { - LlapBaseInputFormat.closeAll(); - hs2Conn.close(); - } +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; - @AfterClass - public static void afterTest() throws Exception { - if (miniHS2.isStarted()) { - miniHS2.stop(); - } - } +/** + * TestJdbcGenericUDTFGetSplits. + */ +public class TestJdbcGenericUDTFGetSplits extends AbstractTestJdbcGenericUDTFGetSplits { @Test(timeout = 200000) public void testGenericUDTFOrderBySplitCount1() throws Exception { - String query = "select get_splits(" + "'select value from " + tableName + "', 5)"; - runQuery(query, getConfigs(), 10); - - query = "select get_splits(" + "'select value from " + tableName + " order by under_col', 5)"; - runQuery(query, getConfigs(), 1); - - query = "select get_splits(" + "'select value from " + tableName + " order by under_col limit 0', 5)"; - runQuery(query, getConfigs(), 0); - - query = "select get_splits(" + - "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)"; - runQuery(query, getConfigs(), 1); - - List setCmds = getConfigs(); - setCmds.add("set hive.llap.external.splits.order.by.force.single.split=false"); - query = "select get_splits(" + - "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)"; - runQuery(query, setCmds, 10); + super.testGenericUDTFOrderBySplitCount1("get_splits", new int[]{10, 1, 0, 1, 10}); } @Test @@ -174,54 +78,4 @@ public void testDecimalPrecisionAndScale() throws Exception { } } - - private void runQuery(final String query, final List setCmds, - final int numRows) throws Exception { - - Connection con = hs2Conn; - BaseJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString()); - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - System.setErr(new PrintStream(baos)); // capture stderr - final Statement selStmt = con.createStatement(); - Throwable throwable = null; - int rowCount = 0; - try { - try { - if (setCmds != null) { - for (String setCmd : setCmds) { - selStmt.execute(setCmd); - } - } - ResultSet resultSet = selStmt.executeQuery(query); - while(resultSet.next()) { - rowCount++; - } - } catch (SQLException e) { - throwable = e; - } - selStmt.close(); - assertNull(throwable); - System.out.println("Expected " + numRows + " rows for query '" + query + "'. Got: " + rowCount); - assertEquals("Expected rows: " + numRows + " got: " + rowCount, numRows, rowCount); - } finally { - baos.close(); - } - - } - - List getConfigs(String... more) { - List setCmds = new ArrayList<>(); - setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict"); - setCmds.add("set mapred.min.split.size=10"); - setCmds.add("set mapred.max.split.size=10"); - setCmds.add("set tez.grouping.min-size=10"); - setCmds.add("set tez.grouping.max-size=10"); - // to get at least 10 splits - setCmds.add("set tez.grouping.split-waves=10"); - if (more != null) { - setCmds.addAll(Arrays.asList(more)); - } - return setCmds; - } } \ No newline at end of file diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits2.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits2.java new file mode 100644 index 0000000000..330174513c --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits2.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import org.junit.Test; + +/** + * TestJdbcGenericUDTFGetSplits2. + */ +public class TestJdbcGenericUDTFGetSplits2 extends AbstractTestJdbcGenericUDTFGetSplits { + + @Test(timeout = 200000) + public void testGenericUDTFOrderBySplitCount1() throws Exception { + super.testGenericUDTFOrderBySplitCount1("get_llap_splits", new int[]{12, 3, 1, 3, 12}); + } + +} diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormat.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormat.java new file mode 100644 index 0000000000..e2884d11d9 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormat.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hive.jdbc; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hadoop.hive.llap.LlapInputSplit; +import org.apache.hadoop.hive.llap.Row; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.junit.BeforeClass; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.junit.Assert.assertTrue; + +/** + * TestNewGetSplitsFormat. + */ +public class TestNewGetSplitsFormat extends BaseJdbcWithMiniLlap { + + @BeforeClass public static void beforeTest() throws Exception { + HiveConf conf = defaultConf(); + conf.setBoolVar(HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED, true); + BaseJdbcWithMiniLlap.beforeTest(conf); + } + + @Override protected InputFormat getInputFormat() { + //For unit testing, no harm in hard-coding allocator ceiling to LONG.MAX_VALUE + return new LlapArrowRowInputFormat(Long.MAX_VALUE); + } + + @Override public void testDataTypes() throws Exception { + TestJdbcWithMiniLlapVectorArrow testJdbcWithMiniLlapVectorArrow = new TestJdbcWithMiniLlapVectorArrow(); + testJdbcWithMiniLlapVectorArrow.testDataTypes(); + } + + @Override protected int processQuery(String currentDatabase, String query, int numSplits, RowProcessor rowProcessor) + throws Exception { + String url = miniHS2.getJdbcURL(); + String user = System.getProperty("user.name"); + String pwd = user; + String handleId = UUID.randomUUID().toString(); + + InputFormat inputFormat = getInputFormat(); + + // Get splits + JobConf job = new JobConf(conf); + job.set(LlapBaseInputFormat.URL_KEY, url); + job.set(LlapBaseInputFormat.USER_KEY, user); + job.set(LlapBaseInputFormat.PWD_KEY, pwd); + job.set(LlapBaseInputFormat.QUERY_KEY, query); + job.set(LlapBaseInputFormat.HANDLE_ID, handleId); + job.set(LlapBaseInputFormat.USE_NEW_SPLIT_FORMAT, "true"); + if (currentDatabase != null) { + job.set(LlapBaseInputFormat.DB_KEY, currentDatabase); + } + + InputSplit[] splits = inputFormat.getSplits(job, numSplits); + assertTrue(splits.length > 2); + + // populate actual splits with schema and planBytes[] + LlapInputSplit schemaSplit = (LlapInputSplit) splits[0]; + LlapInputSplit planSplit = (LlapInputSplit) splits[1]; + + List actualSplits = new ArrayList<>(); + + for (int i = 2; i < splits.length; i++) { + LlapInputSplit actualSplit = (LlapInputSplit) splits[i]; + actualSplit.setSchema(schemaSplit.getSchema()); + actualSplit.setPlanBytes(planSplit.getPlanBytes()); + actualSplits.add(actualSplit); + } + + // Fetch rows from splits + int rowCount = 0; + for (InputSplit split : actualSplits) { + System.out.println("Processing split " + split.getLocations()); + RecordReader reader = inputFormat.getRecordReader(split, job, null); + Row row = reader.createValue(); + while (reader.next(NullWritable.get(), row)) { + rowProcessor.process(row); + ++rowCount; + } + //In arrow-mode this will throw exception unless all buffers have been released + //See org.apache.hadoop.hive.llap.LlapArrowBatchRecordReader + reader.close(); + } + LlapBaseInputFormat.close(handleId); + + return rowCount; + } + +} diff --git llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java index 28966515ad..068a9133b5 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java @@ -91,6 +91,14 @@ public int getSplitNum() { return tokenBytes; } + public void setPlanBytes(byte[] planBytes) { + this.planBytes = planBytes; + } + + public void setSchema(Schema schema) { + this.schema = schema; + } + @Override public void write(DataOutput out) throws IOException { out.writeInt(splitNum); diff --git llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 2aa82b58f2..5c99655104 100644 --- llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -63,7 +63,6 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.InputSplitWithLocationInfo; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; @@ -116,10 +115,11 @@ public static final String PWD_KEY = "llap.if.pwd"; public static final String HANDLE_ID = "llap.if.handleid"; public static final String DB_KEY = "llap.if.database"; + public static final String USE_NEW_SPLIT_FORMAT = "llap.if.use.new.split.format"; public static final String SESSION_QUERIES_FOR_GET_NUM_SPLITS = "llap.session.queries.for.get.num.splits"; public static final Pattern SET_QUERY_PATTERN = Pattern.compile("^\\s*set\\s+.*=.+$", Pattern.CASE_INSENSITIVE); - public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)"; + public static final String SPLIT_QUERY = "select get_llap_splits(\"%s\",%d)"; public static final LlapServiceInstance[] serviceInstanceArray = new LlapServiceInstance[0]; public LlapBaseInputFormat(String url, String user, String pwd, String query) { @@ -281,13 +281,43 @@ public LlapBaseInputFormat() { } } + // In case of USE_NEW_SPLIT_FORMAT=true, following format is used + // type split + // schema-split LlapInputSplit -- contains only schema + // plan-split LlapInputSplit -- contains only planBytes[] + // 0 LlapInputSplit -- actual split 1 + // 1 LlapInputSplit -- actual split 2 + // ... ... + boolean useNewSplitFormat = job.getBoolean(USE_NEW_SPLIT_FORMAT, false); + ResultSet res = stmt.executeQuery(sql); + int count = 0; + LlapInputSplit schemaSplit = null; + LlapInputSplit planSplit = null; while (res.next()) { // deserialize split - DataInput in = new DataInputStream(res.getBinaryStream(1)); - InputSplitWithLocationInfo is = new LlapInputSplit(); + DataInput in = new DataInputStream(res.getBinaryStream(2)); + LlapInputSplit is = new LlapInputSplit(); is.readFields(in); - ins.add(is); + if (useNewSplitFormat) { + ins.add(is); + } else { + // to keep the old format, populate schema and planBytes[] in actual splits + if (count == 0) { + schemaSplit = is; + if (numSplits == 0) { + ins.add(schemaSplit); + } + } else if (count == 1) { + planSplit = is; + } else { + is.setSchema(schemaSplit.getSchema()); + assert planSplit != null; + is.setPlanBytes(planSplit.getPlanBytes()); + ins.add(is); + } + count++; + } } res.close(); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index 62d082d704..b0d7a4e8f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -538,6 +538,7 @@ system.registerGenericUDTF("posexplode", GenericUDTFPosExplode.class); system.registerGenericUDTF("stack", GenericUDTFStack.class); system.registerGenericUDTF("get_splits", GenericUDTFGetSplits.class); + system.registerGenericUDTF("get_llap_splits", GenericUDTFGetSplits2.class); system.registerGenericUDTF("get_sql_schema", GenericUDTFGetSQLSchema.class); //PTF declarations diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 5c760e883c..b6974fa420 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -135,14 +135,30 @@ protected transient JobConf jc; private boolean orderByQuery; private boolean forceSingleSplit; - private ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); - private DataOutput dos = new DataOutputStream(bos); + protected ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); + protected DataOutput dos = new DataOutputStream(bos); + protected String inputArgQuery; + protected int inputArgNumSplits; + protected boolean schemaSplitOnly; @Override public StructObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { - LOG.debug("initializing GenericUDFGetSplits"); + validateInput(arguments); + + List names = Arrays.asList("split"); + List fieldOIs = Arrays + . asList(PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector); + StructObjectInspector outputOI = ObjectInspectorFactory + .getStandardStructObjectInspector(names, fieldOIs); + + LOG.debug("done initializing GenericUDFGetSplits"); + return outputOI; + } + + protected void validateInput(ObjectInspector[] arguments) + throws UDFArgumentLengthException, UDFArgumentTypeException { if (SessionState.get() == null || SessionState.get().getConf() == null) { throw new IllegalStateException("Cannot run get splits outside HS2"); @@ -167,15 +183,6 @@ public StructObjectInspector initialize(ObjectInspector[] arguments) stringOI = (StringObjectInspector) arguments[0]; intOI = (IntObjectInspector) arguments[1]; - - List names = Arrays.asList("split"); - List fieldOIs = Arrays - . asList(PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector); - StructObjectInspector outputOI = ObjectInspectorFactory - .getStandardStructObjectInspector(names, fieldOIs); - - LOG.debug("done initializing GenericUDFGetSplits"); - return outputOI; } public static class PlanFragment { @@ -192,9 +199,31 @@ public PlanFragment(TezWork work, Schema schema, JobConf jc) { @Override public void process(Object[] arguments) throws HiveException { + initArgs(arguments); + try { + SplitResult splitResult = getSplitResult(false); + InputSplit[] splits = schemaSplitOnly ? new InputSplit[]{splitResult.schemaSplit} : splitResult.actualSplits; + for (InputSplit s : splits) { + Object[] os = new Object[1]; + bos.reset(); + s.write(dos); + byte[] frozen = bos.toByteArray(); + os[0] = frozen; + forward(os); + } + } catch (Exception e) { + throw new HiveException(e); + } + } - String query = stringOI.getPrimitiveJavaObject(arguments[0]); - int num = intOI.get(arguments[1]); + protected void initArgs(Object[] arguments) { + inputArgQuery = stringOI.getPrimitiveJavaObject(arguments[0]); + inputArgNumSplits = intOI.get(arguments[1]); + schemaSplitOnly = inputArgNumSplits == 0; + } + + protected SplitResult getSplitResult(boolean generateLightWeightSplits) + throws HiveException, IOException { // Generate applicationId for the LLAP splits LlapCoordinator coordinator = LlapCoordinator.getInstance(); @@ -205,32 +234,36 @@ public void process(Object[] arguments) throws HiveException { ApplicationId applicationId = coordinator.createExtClientAppId(); LOG.info("Generated appID {} for LLAP splits", applicationId.toString()); - PlanFragment fragment = createPlanFragment(query, num, applicationId); + PlanFragment fragment = createPlanFragment(inputArgQuery, applicationId); TezWork tezWork = fragment.work; Schema schema = fragment.schema; boolean generateSingleSplit = forceSingleSplit && orderByQuery; - try { - InputSplit[] splits = getSplits(jc, num, tezWork, schema, applicationId, generateSingleSplit); - LOG.info("Generated {} splits for query {}. orderByQuery: {} forceSingleSplit: {}", splits.length, query, - orderByQuery, forceSingleSplit); - if (generateSingleSplit && splits.length > 1) { - throw new HiveException("Got more than one split (Got: " + splits.length + ") for order by query: " + query); + + SplitResult splitResult = getSplits(jc, tezWork, schema, applicationId, generateSingleSplit, + generateLightWeightSplits); + validateSplitResult(splitResult, generateLightWeightSplits, generateSingleSplit); + return splitResult; + } + + private void validateSplitResult(SplitResult splitResult, boolean generateLightWeightSplits, + boolean generateSingleSplit) throws HiveException { + Preconditions.checkNotNull(splitResult.schemaSplit, "schema split cannot be null"); + if (!schemaSplitOnly) { + InputSplit[] splits = splitResult.actualSplits; + if (splits.length > 0 && generateLightWeightSplits) { + Preconditions.checkNotNull(splitResult.planSplit, "plan split cannot be null"); } - for (InputSplit s : splits) { - Object[] os = new Object[1]; - bos.reset(); - s.write(dos); - byte[] frozen = bos.toByteArray(); - os[0] = frozen; - forward(os); + LOG.info("Generated {} splits for query {}. orderByQuery: {} forceSingleSplit: {}", splits.length, inputArgQuery, + orderByQuery, forceSingleSplit); + if (generateSingleSplit && splits.length > 1) { + throw new HiveException("Got more than one split (Got: " + splits.length + + ") for order by query: " + inputArgQuery); } - } catch (Exception e) { - throw new HiveException(e); } } - public PlanFragment createPlanFragment(String query, int num, ApplicationId splitsAppId) + private PlanFragment createPlanFragment(String query, ApplicationId splitsAppId) throws HiveException { HiveConf conf = new HiveConf(SessionState.get().getConf()); @@ -250,7 +283,7 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli // hive compiler is going to remove inner order by. disable that optimization until then. HiveConf.setBoolVar(conf, ConfVars.HIVE_REMOVE_ORDERBY_IN_SUBQUERY, false); - if(num == 0) { + if (schemaSplitOnly) { //Schema only try { List fieldSchemas = ParseUtils.parseQueryAndGetSchema(conf, query); @@ -378,16 +411,21 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli } } - public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema, ApplicationId applicationId, - final boolean generateSingleSplit) - throws IOException { - if(numSplits == 0) { - //Schema only - LlapInputSplit schemaSplit = new LlapInputSplit( - 0, new byte[0], new byte[0], new byte[0], - new SplitLocationInfo[0], schema, "", new byte[0]); - return new InputSplit[] { schemaSplit }; + // generateLightWeightSplits - if true then + // 1) schema and planBytes[] in each LlapInputSplit are not populated + // 2) schemaSplit(contains only schema) and planSplit(contains only planBytes[]) are populated in SplitResult + private SplitResult getSplits(JobConf job, TezWork work, Schema schema, ApplicationId applicationId, + final boolean generateSingleSplit, boolean generateLightWeightSplits) + throws IOException { + + SplitResult splitResult = new SplitResult(); + splitResult.schemaSplit = new LlapInputSplit( + 0, new byte[0], new byte[0], new byte[0], + new SplitLocationInfo[0], schema, "", new byte[0]); + if (schemaSplitOnly) { + // schema only + return splitResult; } DAG dag = DAG.create(work.getName()); @@ -429,14 +467,15 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork, generateSingleSplit); List eventList = splitGenerator.initialize(); - InputSplit[] result = new InputSplit[eventList.size() - 1]; + int numGroupedSplitsGenerated = eventList.size() - 1; + InputSplit[] result = new InputSplit[numGroupedSplitsGenerated]; InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent) eventList.get(0); List hints = configureEvent.getLocationHint().getTaskLocationHints(); - Preconditions.checkState(hints.size() == eventList.size() - 1); + Preconditions.checkState(hints.size() == numGroupedSplitsGenerated); if (LOG.isDebugEnabled()) { LOG.debug("NumEvents=" + eventList.size() + ", NumSplits=" + result.length); @@ -472,11 +511,14 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli // Generate umbilical token (applies to all splits) Token umbilicalToken = JobTokenCreator.createJobToken(applicationId); - LOG.info("Number of splits: " + (eventList.size() - 1)); + LOG.info("Number of splits: " + numGroupedSplitsGenerated); SignedMessage signedSvs = null; - for (int i = 0; i < eventList.size() - 1; i++) { + byte[] submitWorkBytes = null; + final byte[] emptySubmitWorkBytes = new byte[0]; + final Schema emptySchema = new Schema(); + for (int i = 0; i < numGroupedSplitsGenerated; i++) { TaskSpec taskSpec = new TaskSpecBuilder().constructTaskSpec(dag, vertexName, - eventList.size() - 1, applicationId, i); + numGroupedSplitsGenerated, applicationId, i); // 2. Generate the vertex/submit information for all events. if (i == 0) { @@ -488,28 +530,44 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli // Despite the differences in TaskSpec, the vertex spec should be the same. signedSvs = createSignedVertexSpec(signer, taskSpec, applicationId, queryUser, applicationId.toString()); + SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(applicationId, + System.currentTimeMillis(), numGroupedSplitsGenerated, signedSvs.message, + signedSvs.signature, umbilicalToken); + submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo); + if (generateLightWeightSplits) { + splitResult.planSplit = new LlapInputSplit( + 0, submitWorkBytes, new byte[0], new byte[0], + new SplitLocationInfo[0], new Schema(), "", new byte[0]); + } } - SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(applicationId, - System.currentTimeMillis(), taskSpec.getVertexParallelism(), signedSvs.message, - signedSvs.signature, umbilicalToken); - byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo); - // 3. Generate input event. SignedMessage eventBytes = makeEventBytes(wx, vertexName, eventList.get(i + 1), signer); // 4. Make location hints. SplitLocationInfo[] locations = makeLocationHints(hints.get(i)); - result[i] = new LlapInputSplit(i, submitWorkBytes, eventBytes.message, - eventBytes.signature, locations, schema, llapUser, tokenBytes); - } - return result; + if (generateLightWeightSplits) { + result[i] = new LlapInputSplit(i, emptySubmitWorkBytes, eventBytes.message, + eventBytes.signature, locations, emptySchema, llapUser, tokenBytes); + } else { + result[i] = new LlapInputSplit(i, submitWorkBytes, eventBytes.message, + eventBytes.signature, locations, schema, llapUser, tokenBytes); + } + } + splitResult.actualSplits = result; + return splitResult; } catch (Exception e) { throw new IOException(e); } } + static class SplitResult { + InputSplit schemaSplit; + InputSplit planSplit; + InputSplit[] actualSplits; + } + private static class DriverCleanup implements Closeable { private final Driver driver; private final HiveTxnManager txnManager; diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits2.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits2.java new file mode 100644 index 0000000000..ae53ab7e9f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits2.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.mapred.InputSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * GenericUDTFGetSplits2 - Memory efficient version of GenericUDTFGetSplits. + * It separates out information like schema and planBytes[] which is common to all the splits. + * This produces output in following format. + *

+ * type split + * ---------------------------------------------------- + * schema-split LlapInputSplit -- contains only schema + * plan-split LlapInputSplit -- contains only planBytes[] + * 0 LlapInputSplit -- actual split 1 + * 1 LlapInputSplit -- actual split 2 + * ... ... + */ +@Description(name = "get_llap_splits", value = "_FUNC_(string,int) - " + + "Returns an array of length int serialized splits for the referenced tables string." + + " Passing length 0 returns only schema data for the compiled query. " + + "The order of splits is: schema-split, plan-split, 0, 1, 2...where 0, 1, 2...are the actual splits " + + "This UDTF is for internal use by LlapBaseInputFormat and not to be invoked explicitly") +@UDFType(deterministic = false) +public class GenericUDTFGetSplits2 extends GenericUDTFGetSplits { + private static final Logger LOG = LoggerFactory.getLogger(GenericUDTFGetSplits2.class); + + @Override public StructObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { + LOG.debug("initializing GenericUDFGetSplits2"); + validateInput(arguments); + + List names = Arrays.asList("type", "split"); + List fieldOIs = Arrays.asList(PrimitiveObjectInspectorFactory.javaStringObjectInspector, + PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector); + StructObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs); + + LOG.debug("done initializing GenericUDFGetSplits2"); + return outputOI; + } + + @Override public void process(Object[] arguments) throws HiveException { + try { + initArgs(arguments); + SplitResult splitResult = getSplitResult(true); + forwardOutput(splitResult); + } catch (Exception e) { + throw new HiveException(e); + } + } + + private void forwardOutput(SplitResult splitResult) throws IOException, HiveException { + for (Map.Entry entry : transformSplitResult(splitResult).entrySet()) { + Object[] os = new Object[2]; + os[0] = entry.getKey(); + InputSplit split = entry.getValue(); + bos.reset(); + split.write(dos); + os[1] = bos.toByteArray(); + forward(os); + } + } + + private Map transformSplitResult(SplitResult splitResult) { + Map splitMap = new LinkedHashMap<>(); + splitMap.put("schema-split", splitResult.schemaSplit); + if (splitResult.actualSplits != null && splitResult.actualSplits.length > 0) { + Preconditions.checkNotNull(splitResult.planSplit); + splitMap.put("plan-split", splitResult.planSplit); + for (int i = 0; i < splitResult.actualSplits.length; i++) { + splitMap.put("" + i, splitResult.actualSplits[i]); + } + } + return splitMap; + } +} diff --git ql/src/test/results/clientpositive/show_functions.q.out ql/src/test/results/clientpositive/show_functions.q.out index 04a0fb4458..d88a5f2c67 100644 --- ql/src/test/results/clientpositive/show_functions.q.out +++ ql/src/test/results/clientpositive/show_functions.q.out @@ -129,6 +129,7 @@ format_number from_unixtime from_utc_timestamp get_json_object +get_llap_splits get_splits get_sql_schema greatest @@ -562,6 +563,7 @@ format_number from_unixtime from_utc_timestamp get_json_object +get_llap_splits get_splits get_sql_schema greatest