diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index dd42fd1..eb3a930 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4171,6 +4171,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT("hive.llap.external.splits.temp.table.storage.format", "orc", new StringSet("default", "text", "orc"), "Storage format for temp tables created using LLAP external client"), + LLAP_EXTERNAL_SPLITS_ORDER_BY_FORCE_SINGLE_SPLIT("hive.llap.external.splits.order.by.force.single.split", + true, + "If LLAP external clients submits ORDER BY queries, force return a single split to guarantee reading\n" + + "data out in ordered way. Setting this to false will let external clients read data out in parallel\n" + + "losing the ordering (external clients are responsible for guaranteeing the ordering)"), LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", false, "Override if grace join should be allowed to run in llap."), diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java new file mode 100644 index 0000000..c8a428c --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.PrintStream; +import java.net.URL; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hadoop.hive.metastore.api.WMTrigger; +import org.apache.hadoop.hive.ql.wm.Action; +import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; +import org.apache.hadoop.hive.ql.wm.Expression; +import org.apache.hadoop.hive.ql.wm.ExpressionFactory; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestJdbcGenericUDTFGetSplits { + protected static MiniHS2 miniHS2 = null; + protected static String dataFileDir; + static Path kvDataFilePath; + protected static String tableName = "testtab1"; + + protected static HiveConf conf = null; + protected Connection hs2Conn = null; + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + + String confDir = "../../data/conf/llap/"; + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + + conf = new HiveConf(); + conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default"); + conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100, TimeUnit.MILLISECONDS); + conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, true); + conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); + conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); + conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); + conf.setVar(ConfVars.LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT, "text"); + + + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + + miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + + Map confOverlay = new HashMap<>(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + } + + @Before + public void setUp() throws Exception { + hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); + } + + @After + public void tearDown() throws Exception { + LlapBaseInputFormat.closeAll(); + hs2Conn.close(); + } + + @AfterClass + public static void afterTest() throws Exception { + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + } + + @Test(timeout = 200000) + public void testGenericUDTFOrderBySplitCount1() throws Exception { + String query = "select get_splits(" + "'select value from " + tableName + "', 5)"; + runQuery(query, getConfigs(), 10); + + query = "select get_splits(" + "'select value from " + tableName + " order by under_col', 5)"; + runQuery(query, getConfigs(), 1); + + query = "select get_splits(" + + "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)"; + runQuery(query, getConfigs(), 1); + + List setCmds = getConfigs(); + setCmds.add("set hive.llap.external.splits.order.by.force.single.split=false"); + query = "select get_splits(" + + "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)"; + runQuery(query, setCmds, 10); + } + + private void runQuery(final String query, final List setCmds, + final int numRows) throws Exception { + + Connection con = hs2Conn; + BaseJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString()); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setErr(new PrintStream(baos)); // capture stderr + final Statement selStmt = con.createStatement(); + Throwable throwable = null; + int rowCount = 0; + try { + try { + if (setCmds != null) { + for (String setCmd : setCmds) { + selStmt.execute(setCmd); + } + } + ResultSet resultSet = selStmt.executeQuery(query); + while(resultSet.next()) { + rowCount++; + } + } catch (SQLException e) { + throwable = e; + } + selStmt.close(); + assertNull(throwable); + System.out.println("Expected " + numRows + " rows for query '" + query + "'. Got: " + rowCount); + assertEquals("Expected rows: " + numRows + " got: " + rowCount, numRows, rowCount); + } finally { + baos.close(); + } + + } + + List getConfigs(String... more) { + List setCmds = new ArrayList<>(); + setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict"); + setCmds.add("set mapred.min.split.size=10"); + setCmds.add("set mapred.max.split.size=10"); + setCmds.add("set tez.grouping.min-size=10"); + setCmds.add("set tez.grouping.max-size=10"); + // to get at least 10 splits + setCmds.add("set tez.grouping.split-waves=10"); + if (more != null) { + setCmds.addAll(Arrays.asList(more)); + } + return setCmds; + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 57f6c66..5dc4a1f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -29,6 +29,9 @@ import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.tez.common.counters.TezCounters; @@ -86,13 +89,15 @@ private final MapWork work; private final SplitGrouper splitGrouper = new SplitGrouper(); private final SplitLocationProvider splitLocationProvider; + private boolean generateSingleSplit; - public HiveSplitGenerator(Configuration conf, MapWork work) throws IOException { + public HiveSplitGenerator(Configuration conf, MapWork work, final boolean generateSingleSplit) throws IOException { super(null); this.conf = conf; this.work = work; this.jobConf = new JobConf(conf); + this.generateSingleSplit = generateSingleSplit; // Assuming grouping enabled always. userPayloadProto = MRInputUserPayloadProto.newBuilder().setGroupingEnabled(true).build(); @@ -199,8 +204,27 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES, TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); - // Raw splits - InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves)); + InputSplit[] splits; + if (generateSingleSplit) { + splits = new InputSplit[1]; + List paths = Utilities.getInputPathsTez(jobConf, Utilities.getMapWork(jobConf)); + FileSystem fs = paths.get(0).getFileSystem(jobConf); + FileStatus[] fileStatuses = fs.listStatus(paths.get(0)); + FileStatus fileStatus = fileStatuses[0]; + Preconditions.checkState(paths.size() == 1 && fileStatuses.length == 1, "Requested to generate single " + + "split. Paths and fileStatuses are expected to be 1. Got paths: " + paths.size() + " fileStatuses: " + + fileStatuses.length); + BlockLocation[] locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + Set hostsSet = new HashSet<>(); + for (BlockLocation location : locations) { + hostsSet.addAll(Lists.newArrayList(location.getHosts())); + } + String[] hosts = hostsSet.toArray(new String[0]); + splits[0] = new FileSplit(fileStatus.getPath(), 0, fileStatus.getLen(), hosts); + } else { + // Raw splits + splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves)); + } // Sort the splits, so that subsequent grouping is consistent. Arrays.sort(splits, new InputSplitComparator()); LOG.info("Number of input splits: " + splits.length + ". " + availableSlots diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 20d0961..cec44e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -88,6 +87,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SplitLocationInfo; @@ -130,6 +130,7 @@ protected transient IntObjectInspector intOI; protected transient JobConf jc; private boolean orderByQuery; + private boolean forceSingleSplit; private ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); private DataOutput dos = new DataOutputStream(bos); @@ -204,14 +205,12 @@ public void process(Object[] arguments) throws HiveException { TezWork tezWork = fragment.work; Schema schema = fragment.schema; - if (orderByQuery) { - jc.setBoolean(TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH, false); - jc.setBoolean(TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_COUNT, true); - jc.setInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_COUNT, 1); - } + boolean generateSingleSplit = forceSingleSplit && orderByQuery; try { - InputSplit[] splits = getSplits(jc, num, tezWork, schema, applicationId); - if (orderByQuery && splits.length > 1) { + InputSplit[] splits = getSplits(jc, num, tezWork, schema, applicationId, generateSingleSplit); + LOG.info("Generated {} splits for query {}. orderByQuery: {} forceSingleSplit: {}", splits.length, query, + orderByQuery, forceSingleSplit); + if (generateSingleSplit && splits.length > 1) { throw new HiveException("Got more than one split (Got: " + splits.length + ") for order by query: " + query); } for (InputSplit s : splits) { @@ -243,6 +242,9 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli // Tez/LLAP requires RPC query plan HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true); HiveConf.setBoolVar(conf, ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, false); + // spark-llap always wraps query under a subquery, until that is removed from spark-llap + // hive compiler is going to remove inner order by. disable that optimization until then. + HiveConf.setBoolVar(conf, ConfVars.HIVE_REMOVE_ORDERBY_IN_SUBQUERY, false); try { jc = DagUtils.getInstance().createConfiguration(conf); @@ -267,9 +269,9 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli } QueryPlan plan = driver.getPlan(); - if (plan.getQueryProperties().hasOuterOrderBy()) { - orderByQuery = true; - } + orderByQuery = plan.getQueryProperties().hasOrderBy() || plan.getQueryProperties().hasOuterOrderBy(); + forceSingleSplit = orderByQuery && + HiveConf.getBoolVar(conf, ConfVars.LLAP_EXTERNAL_SPLITS_ORDER_BY_FORCE_SINGLE_SPLIT); List> roots = plan.getRootTasks(); Schema schema = convertSchema(plan.getResultSchema()); if(num == 0) { @@ -364,7 +366,8 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli } } - public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema, ApplicationId applicationId) + public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema, ApplicationId applicationId, + final boolean generateSingleSplit) throws IOException { if(numSplits == 0) { @@ -412,10 +415,8 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli Preconditions.checkState(HiveConf.getBoolVar(wxConf, ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS)); - - HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork); + HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork, generateSingleSplit); List eventList = splitGenerator.initialize(); - InputSplit[] result = new InputSplit[eventList.size() - 1]; InputConfigureVertexTasksEvent configureEvent