commit 5c3fdfaba3d6bf6b3489154e3ccdbe0fb3f10e2f Author: Sahil Takiar Date: Thu Jun 7 09:40:22 2018 -0500 HIVE-19821: Distributed HiveServer2 diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 933bda4ad0..0f0554511b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3017,6 +3017,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "The parent node in ZooKeeper used by HiveServer2 when supporting dynamic service discovery."), HIVE_SERVER2_ZOOKEEPER_PUBLISH_CONFIGS("hive.server2.zookeeper.publish.configs", true, "Whether we should publish HiveServer2's configs to ZooKeeper."), + HIVE_SERVER2_ENABLE_CONTAINER_SERVICE("hive.server2.enable.container.service", false, + "Whether to enable running queries for a session in a dedicated remote process."), // HiveServer2 global init file location HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION("hive.server2.global.init.file.location", "${env:HIVE_CONF_DIR}", diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSparkAndRemoteProcessDriver.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSparkAndRemoteProcessDriver.java new file mode 100644 index 0000000000..7c2ba9520a --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSparkAndRemoteProcessDriver.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.session.HiveSessionHook; +import org.apache.hive.service.cli.session.HiveSessionHookContext; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * This class is cloned from TestJdbcWithMiniMR, except use Spark as the execution engine. + * + * This is copied from TestJdbcWithLocalClusterSpark with the following modifications: + * * Uses a remote metastore + * * Sets ObjectStore.setTwoMetastoreTesting(boolean) to true to prevent some "Persistence + * Manager has been closed" errors + * * Sets hive.security.authorization.manager back to its default value to avoid having to add a + * dependency on the itests jars + */ +public class TestJdbcWithLocalClusterSparkAndRemoteProcessDriver { + public static final String TEST_TAG = "miniHS2.localClusterSpark.tag"; + public static final String TEST_TAG_VALUE = "miniHS2.localClusterSpark.value"; + public static class LocalClusterSparkSessionHook implements HiveSessionHook { + @Override + public void run(HiveSessionHookContext sessionHookContext) throws HiveSQLException { + sessionHookContext.getSessionConf().set(TEST_TAG, TEST_TAG_VALUE); + } + } + + private static MiniHS2 miniHS2 = null; + private static HiveConf conf; + private static Path dataFilePath; + private static String dbName = "mrTestDb"; + private Connection hs2Conn = null; + private Statement stmt; + + private static HiveConf createHiveConf() { + HiveConf conf = new HiveConf(); + conf.set("hive.execution.engine", "spark"); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.set("spark.master", "local-cluster[2,2,1024]"); + conf.set("hive.spark.client.connect.timeout", "30000ms"); + // FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout + // while spark2 is still using Hadoop2. + // Spark requires Hive to support Hadoop3 first then Spark can start + // working on Hadoop3 support. Remove this after Spark supports Hadoop3. + conf.set("dfs.client.datanode-restart.timeout", "30"); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestJdbcWithLocalClusterSpark-local-dir").toString()); + conf.set("hive.security.authorization.manager", "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + conf.set("hive.server2.enable.container.service", "true"); + return conf; + } + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + conf = createHiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + String dataFileDir = conf.get("test.data.files").replace('\\', '/') + .replace("c:", ""); + dataFilePath = new Path(dataFileDir, "kv1.txt"); + DriverManager.setLoginTimeout(0); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + miniHS2 = new MiniHS2.Builder().withRemoteMetastore().withConf(conf).withMiniMR().build(); + Map overlayProps = new HashMap(); + overlayProps.put(ConfVars.HIVE_SERVER2_SESSION_HOOK.varname, + LocalClusterSparkSessionHook.class.getName()); + miniHS2.start(overlayProps); + ObjectStore.setTwoMetastoreTesting(true); + createDb(); + } + + // setup DB + private static void createDb() throws Exception { + Connection conn = DriverManager. + getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); + Statement stmt2 = conn.createStatement(); + stmt2.execute("DROP DATABASE IF EXISTS " + dbName + " CASCADE"); + stmt2.execute("CREATE DATABASE " + dbName); + stmt2.close(); + conn.close(); + } + + @Before + public void setUp() throws Exception { + hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL(dbName), + System.getProperty("user.name"), "bar"); + stmt = hs2Conn.createStatement(); + stmt.execute("USE " + dbName); + } + + @After + public void tearDown() throws Exception { + if (hs2Conn != null) { + hs2Conn.close(); + } + } + + @AfterClass + public static void afterTest() throws Exception { + if (miniHS2 != null && miniHS2.isStarted()) { + miniHS2.stop(); + } + } + + /** + * Verify that the connection to HS2 with MiniMr is successful. + * @throws Exception + */ + @Test + public void testConnection() throws Exception { + // the session hook should set the property + verifyProperty(TEST_TAG, TEST_TAG_VALUE); + } + + /** + * Run nonMr query. + * @throws Exception + */ + @Test + public void testNonSparkQuery() throws Exception { + String tableName = "testTab1"; + String resultVal = "val_238"; + String queryStr = "SELECT * FROM " + tableName; + + testKvQuery(tableName, queryStr, resultVal); + } + + /** + * Run nonMr query. + * @throws Exception + */ + @Test + public void testSparkQuery() throws Exception { + String tableName = "testTab2"; + String resultVal = "val_238"; + String queryStr = "SELECT * FROM " + tableName + + " where value = '" + resultVal + "'"; + + testKvQuery(tableName, queryStr, resultVal); + } + + @Test + public void testPermFunc() throws Exception { + + // This test assumes the hive-contrib JAR has been built as part of the Hive build. + // Also dependent on the UDFExampleAdd class within that JAR. + String udfClassName = "org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd"; + String mvnRepo = System.getProperty("maven.local.repository"); + String hiveVersion = System.getProperty("hive.version"); + String jarFileName = "hive-contrib-" + hiveVersion + ".jar"; + String[] pathParts = { + "org", "apache", "hive", + "hive-contrib", hiveVersion, jarFileName + }; + + // Create path to hive-contrib JAR on local filesystem + Path contribJarPath = new Path(mvnRepo); + for (String pathPart : pathParts) { + contribJarPath = new Path(contribJarPath, pathPart); + } + FileSystem localFs = FileSystem.getLocal(conf); + assertTrue("Hive contrib JAR exists at " + contribJarPath, localFs.exists(contribJarPath)); + + String hdfsJarPathStr = "hdfs:///" + jarFileName; + Path hdfsJarPath = new Path(hdfsJarPathStr); + + // Copy JAR to DFS + FileSystem dfs = miniHS2.getDFS().getFileSystem(); + dfs.copyFromLocalFile(contribJarPath, hdfsJarPath); + assertTrue("Verify contrib JAR copied to HDFS at " + hdfsJarPath, dfs.exists(hdfsJarPath)); + + // Register function + String queryStr = "CREATE FUNCTION example_add AS '" + udfClassName + "'" + + " USING JAR '" + hdfsJarPathStr + "'"; + stmt.execute(queryStr); + + // Call describe + ResultSet res; + res = stmt.executeQuery("DESCRIBE FUNCTION " + dbName + ".example_add"); + checkForNotExist(res); + + // Use UDF in query + String tableName = "testTab3"; + setupKv1Tabs(tableName); + res = stmt.executeQuery("SELECT EXAMPLE_ADD(1, 2) FROM " + tableName + " LIMIT 1"); + assertTrue("query has results", res.next()); + assertEquals(3, res.getInt(1)); + assertFalse("no more results", res.next()); + + // A new connection should be able to call describe/use function without issue + Connection conn2 = DriverManager.getConnection(miniHS2.getJdbcURL(dbName), + System.getProperty("user.name"), "bar"); + Statement stmt2 = conn2.createStatement(); + stmt2.execute("USE " + dbName); + res = stmt2.executeQuery("DESCRIBE FUNCTION " + dbName + ".example_add"); + checkForNotExist(res); + + res = stmt2.executeQuery("SELECT " + dbName + ".example_add(1, 1) FROM " + tableName + " LIMIT 1"); + assertTrue("query has results", res.next()); + assertEquals(2, res.getInt(1)); + assertFalse("no more results", res.next()); + + stmt.execute("DROP TABLE " + tableName); + } + + @Test + public void testTempTable() throws Exception { + // Create temp table with current connection + String tempTableName = "tmp1"; + stmt.execute("CREATE TEMPORARY TABLE " + tempTableName + " (key string, value string)"); + stmt.execute("load data local inpath '" + + dataFilePath.toString() + "' into table " + tempTableName); + + String resultVal = "val_238"; + String queryStr = "SELECT * FROM " + tempTableName + + " where value = '" + resultVal + "'"; + verifyResult(queryStr, resultVal, 2); + + // A second connection should not be able to see the table + Connection conn2 = DriverManager.getConnection(miniHS2.getJdbcURL(dbName), + System.getProperty("user.name"), "bar"); + Statement stmt2 = conn2.createStatement(); + stmt2.execute("USE " + dbName); + boolean gotException = false; + try { + stmt2.executeQuery(queryStr); + } catch (SQLException err) { + // This is expected to fail. + assertTrue("Expecting table not found error, instead got: " + err, + err.getMessage().contains("Table not found")); + gotException = true; + } + assertTrue("Exception while querying non-existing temp table", gotException); + } + + private void checkForNotExist(ResultSet res) throws Exception { + int numRows = 0; + while (res.next()) { + numRows++; + String strVal = res.getString(1); + assertEquals("Should not find 'not exist'", -1, strVal.toLowerCase().indexOf("not exist")); + } + assertTrue("Rows returned from describe function", numRows > 0); + } + + /** + * Verify if the given property contains the expected value. + * @param propertyName + * @param expectedValue + * @throws Exception + */ + private void verifyProperty(String propertyName, String expectedValue) throws Exception { + Statement stmt = hs2Conn .createStatement(); + ResultSet res = stmt.executeQuery("set " + propertyName); + assertTrue(res.next()); + String[] results = res.getString(1).split("="); + assertEquals("Property should be set", results.length, 2); + assertEquals("Property should be set", expectedValue, results[1]); + } + + // create tables, verify query + private void testKvQuery(String tableName, String queryStr, String resultVal) + throws SQLException { + setupKv1Tabs(tableName); + verifyResult(queryStr, resultVal, 2); + stmt.execute("DROP TABLE " + tableName); + } + + // create table and pupulate with kv1.txt + private void setupKv1Tabs(String tableName) throws SQLException { + Statement stmt = hs2Conn.createStatement(); + // create table + stmt.execute("CREATE TABLE " + tableName + + " (under_col INT COMMENT 'the under column', value STRING)" + + " COMMENT ' test table'"); + + // load data + stmt.execute("load data local inpath '" + + dataFilePath.toString() + "' into table " + tableName); + } + + // run given query and validate expecated result + private void verifyResult(String queryStr, String expString, int colPos) + throws SQLException { + ResultSet res = stmt.executeQuery(queryStr); + assertTrue(res.next()); + assertEquals(expString, res.getString(colPos)); + res.close(); + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSparkAndRemoteProcessDriver.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSparkAndRemoteProcessDriver.java new file mode 100644 index 0000000000..d757cd0771 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSparkAndRemoteProcessDriver.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.session.HiveSessionHook; +import org.apache.hive.service.cli.session.HiveSessionHookContext; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * + * This is copied from TestMultiSessionsHS2WithLocalClusterSpark with the following modifications: + * * Uses a remote metastore + * * Sets ObjectStore.setTwoMetastoreTesting(boolean) to true to prevent some "Persistence + * Manager has been closed" errors + * * Sets hive.security.authorization.manager back to its default value to avoid having to add a + * dependency on the itests jars + */ +public class TestMultiSessionsHS2WithLocalClusterSparkAndRemoteProcessDriver { + public static final String TEST_TAG = "miniHS2.localClusterSpark.tag"; + public static final String TEST_TAG_VALUE = "miniHS2.localClusterSpark.value"; + private static final int PARALLEL_NUMBER = 3; + + public static class LocalClusterSparkSessionHook implements HiveSessionHook { + @Override + public void run(HiveSessionHookContext sessionHookContext) throws HiveSQLException { + sessionHookContext.getSessionConf().set(TEST_TAG, TEST_TAG_VALUE); + } + } + + private static MiniHS2 miniHS2 = null; + private static HiveConf conf; + private static Path dataFilePath; + private static String dbName = "sparkTestDb"; + private ThreadLocal localConnection = new ThreadLocal(); + private ThreadLocal localStatement = new ThreadLocal(); + private ExecutorService pool = null; + + + private static HiveConf createHiveConf() { + HiveConf conf = new HiveConf(); + conf.set("hive.exec.parallel", "true"); + conf.set("hive.execution.engine", "spark"); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.set("spark.master", "local-cluster[2,2,1024]"); + conf.set("spark.deploy.defaultCores", "2"); + conf.set("hive.spark.client.connect.timeout", "30000ms"); + // FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout + // while spark2 is still using Hadoop2. + // Spark requires Hive to support Hadoop3 first then Spark can start + // working on Hadoop3 support. Remove this after Spark supports Hadoop3. + conf.set("dfs.client.datanode-restart.timeout", "30"); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestMultiSessionsHS2WithLocalClusterSpark-local-dir").toString()); + conf.set("hive.security.authorization.manager", "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + conf.set("hive.server2.enable.container.service", "true"); + return conf; + } + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + conf = createHiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + String dataFileDir = conf.get("test.data.files").replace('\\', '/') + .replace("c:", ""); + dataFilePath = new Path(dataFileDir, "kv1.txt"); + DriverManager.setLoginTimeout(0); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + miniHS2 = new MiniHS2.Builder().withRemoteMetastore().withConf(conf).withMiniMR().build(); + Map overlayProps = new HashMap(); + overlayProps.put(ConfVars.HIVE_SERVER2_SESSION_HOOK.varname, + LocalClusterSparkSessionHook.class.getName()); + miniHS2.start(overlayProps); + ObjectStore.setTwoMetastoreTesting(true); + createDb(); + } + + // setup DB + private static void createDb() throws Exception { + Connection conn = DriverManager. + getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); + Statement stmt2 = conn.createStatement(); + stmt2.execute("DROP DATABASE IF EXISTS " + dbName + " CASCADE"); + stmt2.execute("CREATE DATABASE " + dbName); + stmt2.close(); + conn.close(); + } + + @Before + public void setUp() throws Exception { + pool = Executors.newFixedThreadPool(PARALLEL_NUMBER, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Test-Thread-%d").build()); + createConnection(); + } + + @After + public void tearDown() throws Exception { + pool.shutdownNow(); + closeConnection(); + } + + private void createConnection() throws Exception { + Connection connection = DriverManager.getConnection(miniHS2.getJdbcURL(dbName), + System.getProperty("user.name"), "bar"); + Statement statement = connection.createStatement(); + localConnection.set(connection); + localStatement.set(statement); + statement.execute("USE " + dbName); + } + + private void closeConnection() throws SQLException { + if (localStatement.get() != null) { + localStatement.get().close(); + } + + if (localConnection.get() != null) { + localConnection.get().close(); + } + } + + @AfterClass + public static void afterTest() throws Exception { + if (miniHS2 != null && miniHS2.isStarted()) { + miniHS2.stop(); + } + } + + /** + * Run nonSpark query + * + * @throws Exception + */ + @Test + public void testNonSparkQuery() throws Exception { + String tableName = "kvTable1"; + setupTable(tableName); + Callable runNonSparkQuery = getNonSparkQueryCallable(tableName); + runInParallel(runNonSparkQuery); + dropTable(tableName); + } + + /** + * Run spark query + * + * @throws Exception + */ + @Test + public void testSparkQuery() throws Exception { + String tableName = "kvTable2"; + setupTable(tableName); + Callable runSparkQuery = getSparkQueryCallable(tableName); + runInParallel(runSparkQuery); + dropTable(tableName); + } + + private void runInParallel(Callable runNonSparkQuery) throws InterruptedException, ExecutionException { + List futureList = new LinkedList(); + for (int i = 0; i < PARALLEL_NUMBER; i++) { + Future future = pool.submit(runNonSparkQuery); + futureList.add(future); + } + + for (Future future : futureList) { + future.get(); + } + } + + private Callable getNonSparkQueryCallable(final String tableName) { + return new Callable() { + @Override + public Void call() throws Exception { + String resultVal = "val_238"; + String queryStr = "SELECT * FROM " + tableName; + testKvQuery(queryStr, resultVal); + return null; + } + }; + } + + private Callable getSparkQueryCallable(final String tableName) { + return new Callable() { + @Override + public Void call() throws Exception { + String resultVal = "val_238"; + String queryStr = "SELECT * FROM " + tableName + + " where value = '" + resultVal + "'"; + testKvQuery(queryStr, resultVal); + return null; + } + }; + } + + private void testKvQuery(String queryStr, String resultVal) + throws Exception { + createConnection(); + verifyResult(queryStr, resultVal, 2); + closeConnection(); + } + + // create table and load kv1.txt + private void setupTable(String tableName) throws SQLException { + Statement statement = localStatement.get(); + // create table + statement.execute("CREATE TABLE " + tableName + + " (under_col INT COMMENT 'the under column', value STRING)" + + " COMMENT ' test table'"); + + // load data + statement.execute("LOAD DATA LOCAL INPATH '" + + dataFilePath.toString() + "' INTO TABLE " + tableName); + } + + private void dropTable(String tableName) throws SQLException { + localStatement.get().execute("DROP TABLE " + tableName); + } + + // run given query and validate expected result + private void verifyResult(String queryStr, String expString, int colPos) + throws SQLException { + ResultSet res = localStatement.get().executeQuery(queryStr); + assertTrue(res.next()); + assertEquals(expString, res.getString(colPos)); + res.close(); + } +} diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index f19a3ad208..c1df4648ec 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -1299,7 +1299,7 @@ public void init() throws Exception { db.getConf().set("hive.server2.materializedviews.registry.impl", "DUMMY"); HiveMaterializedViewsRegistry.get().init(db); db.getConf().set("hive.server2.materializedviews.registry.impl", registryImpl); - drv = DriverFactory.newDriver(conf); + drv = DriverFactory.newRemoteProcessDriver(conf); pd = new ParseDriver(); sem = new SemanticAnalyzer(queryState); } diff --git a/ql/pom.xml b/ql/pom.xml index 0c181e515c..655f30ab96 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -946,6 +946,21 @@ org.apache.orc:orc-shims org.apache.orc:orc-tools joda-time:joda-time + org.apache.logging.log4j:log4j-api + org.apache.calcite.avatica:avatica + org.apache.calcite:calcite-core + org.apache.calcite:calcite-linq4j + org.apache.calcite:calcite-druid + org.apache.thrift:libfb303 + org.datanucleus:datanucleus-core + org.datanucleus:datanucleus-api-jdo + org.datanucleus:datanucleus-rdbms + org.datanucleus:javax.jdo + commons-pool:commons-pool + commons-dbcp:commons-dbcp + org.antlr:antlr-runtime + javax.jdo:jdo-api + org.apache.hbase:hbase-common diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java index 0f6a80ef0d..56d1a15db6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java @@ -18,10 +18,12 @@ package org.apache.hadoop.hive.ql; +import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.reexec.IReExecutionPlugin; import org.apache.hadoop.hive.ql.reexec.ReExecDriver; import org.apache.hadoop.hive.ql.reexec.ReExecutionOverlayPlugin; @@ -34,6 +36,30 @@ */ public class DriverFactory { + /** + * If {@link HiveConf.ConfVars#HIVE_SERVER2_ENABLE_CONTAINER_SERVICE} is true, return a + * {@link RemoteProcessDriver}, else delegate to {@link #newDriver(HiveConf)}. + */ + public static IDriver newRemoteProcessDriver(HiveConf conf) throws IOException, HiveException { + if (conf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_CONTAINER_SERVICE)) { + return new RemoteProcessDriver(conf); + } else { + return newDriver(conf); + } + } + + /** + * If {@link HiveConf.ConfVars#HIVE_SERVER2_ENABLE_CONTAINER_SERVICE} is true, return a + * {@link RemoteProcessDriver}, else delegate to {@link #newDriver(QueryState, String, QueryInfo)}. + */ + public static IDriver newRemoteProcessDriver(QueryState queryState, String userName, QueryInfo queryInfo) throws IOException, HiveException { + if (queryState.getConf().getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_CONTAINER_SERVICE)) { + return new RemoteProcessDriver(queryState, userName, queryInfo); + } else { + return newDriver(queryState, userName, queryInfo); + } + } + public static IDriver newDriver(HiveConf conf) { return newDriver(getNewQueryState(conf), null, null); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessClient.java b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessClient.java new file mode 100644 index 0000000000..78b04b196d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessClient.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; + +import java.io.IOException; +import java.util.List; + + +/** + * API that defines how to run {@link IDriver} methods in a remote process. Used by + * {@link RemoteProcessDriver}. + */ +interface RemoteProcessClient { + + CommandProcessorResponse run(String statement) throws Exception; + + boolean getResults(List res) throws IOException; + + CommandProcessorResponse compileAndRespond(String statement); + + CommandProcessorResponse run(); + + boolean hasResultSet(); + + Schema getSchema(); + + boolean isFetchingTable(); + + void close(); + + void destroy(); + + QueryDisplay getQueryDisplay(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessClientFactory.java new file mode 100644 index 0000000000..e1d24c6ab6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessClientFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient; +import org.apache.hadoop.hive.ql.exec.spark.RemoteProcessHiveSparkClientImpl; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +/** + * Creates a {@link RemoteProcessClient} for a {@link RemoteProcessDriver}. + */ +public class RemoteProcessClientFactory { + + public static RemoteProcessClient createRemoteProcessClient(HiveConf hiveConf, String queryId) { + + if ("spark".equals(HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) { + SparkSession sparkSession; + try { + sparkSession = SparkUtilities.getSparkSession(hiveConf, + SparkSessionManagerImpl.getInstance()); + } catch (HiveException e) { + throw new RuntimeException(e); + } + if (!(sparkSession.getHiveSparkClient() instanceof RemoteHiveSparkClient)) { + throw new IllegalArgumentException(); + } + RemoteHiveSparkClient remoteHiveSparkClient = (RemoteHiveSparkClient) sparkSession.getHiveSparkClient(); + return new SparkRemoteProcessClient(queryId, hiveConf, new RemoteProcessHiveSparkClientImpl(queryId, remoteHiveSparkClient.getSparkClient().getClientProtocol())); + } + throw new IllegalArgumentException(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessDriver.java new file mode 100644 index 0000000000..eba396ae8a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessDriver.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; + +import java.io.IOException; +import java.util.List; + + +/** + * Runs a {@link IDriver} in a remote process. + */ +public class RemoteProcessDriver implements IDriver { + + private final HiveConf hiveConf; + private final QueryState queryState; + private final String userName; + private final QueryInfo queryInfo; + private final RemoteProcessClient remoteProcessClient; + + public RemoteProcessDriver(HiveConf hiveConf) throws IOException, HiveException { + this(new QueryState.Builder().withGenerateNewQueryId(true).withHiveConf(hiveConf).build(), null, + null); + } + + public RemoteProcessDriver(QueryState queryState, String userName, QueryInfo queryInfo) throws IOException, HiveException { + this.hiveConf = queryState.getConf(); + this.queryState = queryState; + this.userName = userName; + this.queryInfo = queryInfo; + this.remoteProcessClient = createRemoteProcessClient(queryState.getConf(), queryState.getQueryId()); + } + + @Override + public int compile(String string) { + return 0; + } + + @Override + public CommandProcessorResponse compileAndRespond(String statement) { + return this.remoteProcessClient.compileAndRespond(statement); + } + + @Override + public QueryPlan getPlan() { + return null; + } + + @Override + public QueryDisplay getQueryDisplay() { + return new QueryDisplay() { + @Override + public synchronized List getTaskDisplays() { + return null; + } + }; + } + + @Override + public void setOperationId(String guid64) { + + } + + @Override + public CommandProcessorResponse run() { + return this.remoteProcessClient.run(); + } + + @Override + public CommandProcessorResponse run(String command) { + try { + return this.remoteProcessClient.run(command); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean getResults(List res) throws IOException { + return this.remoteProcessClient.getResults(res); + } + + @Override + public void setMaxRows(int maxRows) { + + } + + @Override + public FetchTask getFetchTask() { + return null; + } + + @Override + public Schema getSchema() { + return this.remoteProcessClient.getSchema(); + } + + @Override + public boolean isFetchingTable() { + return this.remoteProcessClient.isFetchingTable(); + } + + @Override + public void resetFetch() { + + } + + @Override + public void close() { + this.remoteProcessClient.close(); + } + + @Override + public void destroy() { + this.remoteProcessClient.destroy(); + } + + @Override + public HiveConf getConf() { + return null; + } + + /** + * Don't support getting the {@link Context} because it requires serializing the entire context + * object. This method is mostly used for the {@link org.apache.hadoop.hive.ql.reexec.ReExecDriver} + * and various unit tests. + */ + @Override + public Context getContext() { + throw new UnsupportedOperationException( + "RemoteProcessDriver does not support getting the Semantic Analyzer Context"); + } + + @Override + public boolean hasResultSet() { + return this.remoteProcessClient.hasResultSet(); + } + + private RemoteProcessClient createRemoteProcessClient(HiveConf hiveConf, + String queryId) throws IOException, HiveException { + SessionState.get().launchRemoteProcess(); + return RemoteProcessClientFactory.createRemoteProcessClient(hiveConf, queryId); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessLauncher.java b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessLauncher.java new file mode 100644 index 0000000000..907859b225 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessLauncher.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +import java.io.IOException; + + +/** + * Launches the remote process that will be used by the {@link RemoteProcessDriver}. + */ +public interface RemoteProcessLauncher { + + void launch() throws IOException, HiveException; +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessLauncherFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessLauncherFactory.java new file mode 100644 index 0000000000..0661821b2c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessLauncherFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; + + +/** + * Creates a {@link RemoteProcessLauncher}. + */ +public class RemoteProcessLauncherFactory { + + public static RemoteProcessLauncher getRemoteProcessLauncher(HiveConf hiveConf) { + if ("spark".equals(HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) { + return new SparkRemoteProcessLauncher(hiveConf); + } + throw new IllegalArgumentException(); + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/SparkRemoteProcessClient.java b/ql/src/java/org/apache/hadoop/hive/ql/SparkRemoteProcessClient.java new file mode 100644 index 0000000000..7259038768 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/SparkRemoteProcessClient.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.exec.spark.KryoSerializer; +import org.apache.hadoop.hive.ql.exec.spark.RemoteProcessHiveSparkClient; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; + +import java.io.IOException; +import java.util.List; + + +/** + * A {@link RemoteProcessClient} that uses a Spark driver to run all remote driver operations. It + * uses a {@link RemoteProcessHiveSparkClient} to interact with the remote driver. + */ +class SparkRemoteProcessClient implements RemoteProcessClient { + + private final String queryId; + private final HiveConf hiveConf; + private final RemoteProcessHiveSparkClient remoteProcessHiveSparkClient; + private PerfLogger perfLogger; + + SparkRemoteProcessClient(String queryId, HiveConf hiveConf, + RemoteProcessHiveSparkClient remoteProcessHiveSparkClient) { + this.queryId = queryId; + this.hiveConf = hiveConf; + this.remoteProcessHiveSparkClient = remoteProcessHiveSparkClient; + this.perfLogger = SessionState.getPerfLogger(); + } + + @Override + public CommandProcessorResponse run(String statement) { + this.perfLogger.PerfLogBegin(getClass().getSimpleName(), "serializeHiveConf"); + byte[] hiveConfBytes = KryoSerializer.serializeHiveConf(hiveConf); + this.perfLogger.PerfLogEnd(getClass().getSimpleName(), "serializeHiveConf"); + + return this.remoteProcessHiveSparkClient.run(statement, hiveConfBytes); + } + + @Override + public boolean getResults(List res) throws IOException { + return this.remoteProcessHiveSparkClient.getResults(res); + } + + @Override + public CommandProcessorResponse compileAndRespond(String statement) { + this.perfLogger.PerfLogBegin(getClass().getSimpleName(), "serializeHiveConf"); + byte[] hiveConfBytes = KryoSerializer.serializeHiveConf(hiveConf); + this.perfLogger.PerfLogEnd(getClass().getSimpleName(), "serializeHiveConf"); + return this.remoteProcessHiveSparkClient.compileAndRespond(statement, hiveConfBytes); + } + + @Override + public CommandProcessorResponse run() { + return this.remoteProcessHiveSparkClient.run(); + } + + @Override + public boolean hasResultSet() { + return this.remoteProcessHiveSparkClient.hasResultSet(); + } + + @Override + public Schema getSchema() { + return this.remoteProcessHiveSparkClient.getSchema(); + } + + @Override + public boolean isFetchingTable() { + return this.remoteProcessHiveSparkClient.isFetchingTable(); + } + + @Override + public void close() { + this.remoteProcessHiveSparkClient.close(); + } + + @Override + public void destroy() { + this.remoteProcessHiveSparkClient.destroy(); + } + + @Override + public QueryDisplay getQueryDisplay() { + return this.remoteProcessHiveSparkClient.getQueryDisplay(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/SparkRemoteProcessLauncher.java b/ql/src/java/org/apache/hadoop/hive/ql/SparkRemoteProcessLauncher.java new file mode 100644 index 0000000000..8dddc9785b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/SparkRemoteProcessLauncher.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.spark.KryoSerializer; +import org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; +import org.apache.hadoop.hive.ql.metadata.HiveException; + + +/** + * A {@link RemoteProcessLauncher} that defines the remote process as the Spark driver. It uses + * the existing {@link SparkSession} and {@link org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager} + * logic to launch the {@link org.apache.hive.spark.client.RemoteDriver}. + */ +class SparkRemoteProcessLauncher implements RemoteProcessLauncher { + + private final HiveConf hiveConf; + + SparkRemoteProcessLauncher(HiveConf hiveConf) { + this.hiveConf = hiveConf; + } + + @Override + public void launch() throws HiveException { + SparkSession ss = SparkUtilities.getSparkSession(hiveConf, SparkSessionManagerImpl.getInstance()); + byte[] hiveConfBytes = KryoSerializer.serializeHiveConf(hiveConf); + ((RemoteHiveSparkClient) ss.getHiveSparkClient()).getSparkClient().getClientProtocol().startSession(hiveConfBytes); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 5ed5d4214e..e7d920aa43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -251,7 +251,7 @@ private static void addCredentialProviderPassword(Map sparkConf, sparkConf.put("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD", jobCredstorePassword); } - static SparkConf generateSparkConf(Map conf) { + public static SparkConf generateSparkConf(Map conf) { SparkConf sparkConf = new SparkConf(false); for (Map.Entry entry : conf.entrySet()) { sparkConf.set(entry.getKey(), entry.getValue()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java index f77e92cd2a..446f746e42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java @@ -24,6 +24,7 @@ import java.io.IOException; import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; @@ -64,6 +65,25 @@ return result; } + public static byte[] serializeHiveConf(HiveConf hiveConf) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + hiveConf.write(new DataOutputStream(out)); + } catch (IOException e) { + LOG.error("Error serializing job configuration: " + e, e); + return null; + } finally { + try { + out.close(); + } catch (IOException e) { + LOG.error("Error closing output stream: " + e, e); + } + } + + return out.toByteArray(); + + } + public static byte[] serializeJobConf(JobConf jobConf) { ByteArrayOutputStream out = new ByteArrayOutputStream(); try { @@ -94,4 +114,14 @@ public static JobConf deserializeJobConf(byte[] buffer) { return conf; } + public static HiveConf deserializeHiveConf(byte[] buffer) { + HiveConf conf = new HiveConf(); + try { + conf.readFields(new DataInputStream(new ByteArrayInputStream(buffer))); + } catch (IOException e) { + String msg = "Error de-serializing job configuration: " + e; + throw new IllegalStateException(msg, e); + } + return conf; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index 72ff53e3bd..b04dc6d35f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.hadoop.hive.ql.exec.DagUtils; +import org.apache.hive.spark.client.SparkClient; import org.apache.hive.spark.client.SparkClientUtilities; import org.apache.spark.util.CallSite; import org.slf4j.Logger; @@ -88,6 +89,11 @@ public static synchronized LocalHiveSparkClient getInstance( private final JobMetricsListener jobMetricsListener; + public LocalHiveSparkClient(JavaSparkContext sc, JobMetricsListener jobMetricsListener) { + this.sc = sc; + this.jobMetricsListener = jobMetricsListener; + } + private LocalHiveSparkClient(SparkConf sparkConf, HiveConf hiveConf) throws FileNotFoundException, MalformedURLException { String regJar = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index d31a202261..e990d50322 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -180,6 +180,10 @@ public int getDefaultParallelism() throws Exception { return handler.get(sparkClientTimtout, TimeUnit.SECONDS); } + public SparkClient getSparkClient() { + return remoteClient; + } + @Override public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessDriverExecutorFactoryImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessDriverExecutorFactoryImpl.java new file mode 100644 index 0000000000..e92d23891b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessDriverExecutorFactoryImpl.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverFactory; +import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.spark.client.RemoteProcessDriverExecutor; +import org.apache.hive.spark.client.RemoteProcessDriverExecutorFactory; + +import java.io.IOException; +import java.util.List; + + +/** + * A simple of implementation of {@link RemoteProcessDriverExecutorFactory} that creates + * {@link RemoteProcessDriverExecutor}s. + */ +public class RemoteProcessDriverExecutorFactoryImpl implements RemoteProcessDriverExecutorFactory { + + private final HiveConf hiveConf; + + public RemoteProcessDriverExecutorFactoryImpl(byte[] hiveConfBytes) { + this.hiveConf = KryoSerializer.deserializeHiveConf(hiveConfBytes); + SessionState.start(this.hiveConf); + } + + @Override + public RemoteProcessDriverExecutor createRemoteProcessDriverExecutor(byte[] hiveConfBytes) { + return new RemoteProcessDriverExecutorImpl(hiveConfBytes); + } + + /** + * A simple implementation of {@link RemoteProcessDriverExecutor} that delegates to the + * {@link IDriver} defined by {@link DriverFactory#newDriver(HiveConf)}. + */ + private static final class RemoteProcessDriverExecutorImpl implements RemoteProcessDriverExecutor { + + private HiveConf hiveConf; + private IDriver driver; + + private RemoteProcessDriverExecutorImpl(byte[] hiveConfBytes) { + this.hiveConf = KryoSerializer.deserializeHiveConf(hiveConfBytes); + this.driver = DriverFactory.newDriver(this.hiveConf); + } + + @Override + public Exception run(String command) { + return this.driver.run(command); + } + + @Override + public boolean getResults(List res) throws IOException { + return this.driver.getResults(res); + } + + @Override + public Exception run() { + return this.driver.run(); + } + + @Override + public Exception compileAndRespond(String command) { + return this.driver.compileAndRespond(command); + } + + @Override + public boolean hasResultSet() { + return this.driver.hasResultSet(); + } + + @Override + public byte[] getSchema() { + return KryoSerializer.serialize(this.driver.getSchema()); + } + + @Override + public boolean isFetchingTable() { + return this.driver.isFetchingTable(); + } + + @Override + public void close() { + this.driver.close(); + } + + @Override + public void destroy() { + this.driver.destroy(); + } + + @Override + public byte[] getQueryDisplay() { + return KryoSerializer.serialize(this.driver.getQueryDisplay()); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessHiveSparkClient.java new file mode 100644 index 0000000000..5f477b58a8 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessHiveSparkClient.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.QueryDisplay; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + + +/** + * Similar to {@link HiveSparkClient} except it defines client methods to be used by + * {@link org.apache.hadoop.hive.ql.SparkRemoteProcessClient}. + */ +public interface RemoteProcessHiveSparkClient extends Serializable { + + CommandProcessorResponse run(String command, byte[] hiveConfBytes); + + boolean getResults(List res) throws IOException; + + CommandProcessorResponse compileAndRespond(String statement, byte[] hiveConfBytes); + + CommandProcessorResponse run(); + + boolean hasResultSet(); + + Schema getSchema(); + + boolean isFetchingTable(); + + void close(); + + void destroy(); + + QueryDisplay getQueryDisplay(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessHiveSparkClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessHiveSparkClientImpl.java new file mode 100644 index 0000000000..b1279e7ba6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessHiveSparkClientImpl.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.QueryDisplay; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hive.spark.client.AbstractSparkClient; + +import java.util.List; + + +/** + * A {@link RemoteProcessHiveSparkClient} that uses {@link AbstractSparkClient.ClientProtocol} to + * run the necessary driver commands. + */ +public class RemoteProcessHiveSparkClientImpl implements RemoteProcessHiveSparkClient { + + + private final String queryId; + private final AbstractSparkClient.ClientProtocol clientProtocol; + + public RemoteProcessHiveSparkClientImpl(String queryId, + AbstractSparkClient.ClientProtocol clientProtocol) { + this.queryId = queryId; + this.clientProtocol = clientProtocol; + } + + @Override + public CommandProcessorResponse run(String command, byte[] hiveConfBytes) { + return (CommandProcessorResponse) this.clientProtocol.run(command, hiveConfBytes, this.queryId); + } + + @Override + public boolean getResults(List res) { + return this.clientProtocol.getResults(this.queryId, res); + } + + @Override + public CommandProcessorResponse compileAndRespond(String statement, byte[] hiveConfBytes) { + return (CommandProcessorResponse) this.clientProtocol.compileAndRespond(this.queryId, statement, + hiveConfBytes); + } + + @Override + public CommandProcessorResponse run() { + return (CommandProcessorResponse) this.clientProtocol.run(this.queryId); + } + + @Override + public boolean hasResultSet() { + return this.clientProtocol.hasResultSet(this.queryId); + } + + @Override + public Schema getSchema() { + return KryoSerializer.deserialize(this.clientProtocol.getSchema(this.queryId), Schema.class); + } + + @Override + public boolean isFetchingTable() { + return this.clientProtocol.isFetchingTable(this.queryId); + } + + @Override + public void close() { + this.clientProtocol.closeDriver(this.queryId); + } + + @Override + public void destroy() { + this.clientProtocol.destroyDriver(this.queryId); + } + + @Override + public QueryDisplay getQueryDisplay() { + return KryoSerializer.deserialize(this.clientProtocol.getQueryDisplay(this.queryId), + QueryDisplay.class); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 02613f2ca3..37ab69dcbc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -35,6 +35,9 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionWorkSubmitter; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkWorkSubmitter; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkWorkSubmitterFactory; import org.apache.hadoop.hive.ql.exec.spark.status.impl.SparkMetricsUtils; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStage; @@ -112,12 +115,9 @@ public int execute(DriverContext driverContext) { int rc = 0; perfLogger = SessionState.getPerfLogger(); - SparkSession sparkSession = null; - SparkSessionManager sparkSessionManager = null; + SparkWorkSubmitter workSubmitter = null; try { printConfigInfo(); - sparkSessionManager = SparkSessionManagerImpl.getInstance(); - sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager); SparkWork sparkWork = getWork(); sparkWork.setRequiredCounterPrefix(getOperatorCounters()); @@ -125,7 +125,8 @@ public int execute(DriverContext driverContext) { // Submit the Spark job perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB); - jobRef = sparkSession.submit(driverContext, sparkWork); + workSubmitter = SparkWorkSubmitterFactory.getSparkWorkSubmitter(conf); + jobRef = workSubmitter.submit(driverContext, sparkWork); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); // If the driver context has been shutdown (due to query cancellation) kill the Spark job @@ -213,13 +214,8 @@ public int execute(DriverContext driverContext) { } finishTime = perfLogger.getEndTime(PerfLogger.SPARK_RUN_JOB); Utilities.clearWork(conf); - if (sparkSession != null && sparkSessionManager != null) { - rc = close(rc); - try { - sparkSessionManager.returnSession(sparkSession); - } catch (HiveException ex) { - LOG.error("Failed to return the session to SessionManager", ex); - } + if (workSubmitter != null) { + rc = workSubmitter.close(this, rc); } } return rc; @@ -355,7 +351,7 @@ static String sparkStatisticsToString(SparkStatistics sparkStatistic, int sparkJ * Close will move the temp files into the right place for the fetch * task. If the job has failed it will clean up the files. */ - private int close(int rc) { + public int close(int rc) { try { List ws = work.getAllWork(); for (BaseWork w: ws) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index fdc5361989..fc0f3272de 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -55,6 +56,8 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hive.spark.client.SparkClientUtilities; import org.apache.spark.SparkConf; +import org.apache.spark.util.Utils; +import org.slf4j.Logger; /** @@ -339,4 +342,35 @@ public static String reverseDNSLookupURL(String url) throws UnknownHostException InetAddress address = InetAddress.getByName(uri.getHost()); return uri.getScheme() + "://" + address.getCanonicalHostName() + ":" + uri.getPort(); } + + public static ObjectPair getMemoryAndCores(Logger log, SparkConf sparkConf, int + numExecutors, int defaultParallelism) { + // at start-up, we may be unable to get number of executors + if (numExecutors <= 0) { + return new ObjectPair(-1L, -1); + } + int executorMemoryInMB = Utils.memoryStringToMb( + sparkConf.get("spark.executor.memory", "512m")); + double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6); + long totalMemory = (long) (numExecutors * executorMemoryInMB * memoryFraction * 1024 * 1024); + int totalCores; + String masterURL = sparkConf.get("spark.master"); + if (masterURL.startsWith("spark") || masterURL.startsWith("local")) { + totalCores = sparkConf.contains("spark.default.parallelism") ? + sparkConf.getInt("spark.default.parallelism", 1) : + defaultParallelism; + totalCores = Math.max(totalCores, numExecutors); + } else { + int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1); + totalCores = numExecutors * coresPerExecutor; + } + totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1); + + long memoryPerTaskInBytes = totalMemory / totalCores; + log.info("Hive on Spark application currently has number of executors: " + numExecutors + + ", total cores: " + totalCores + ", memory per executor: " + + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction); + return new ObjectPair(Long.valueOf(memoryPerTaskInBytes), + Integer.valueOf(totalCores)); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkLocalClientWorkSubmitter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkLocalClientWorkSubmitter.java new file mode 100644 index 0000000000..5f2460d98b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkLocalClientWorkSubmitter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.session; + +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.spark.LocalHiveSparkClient; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener; +import org.apache.hadoop.hive.ql.plan.SparkWork; + +import org.apache.hive.spark.client.RemoteDriver; + + +/** + * A {@link SparkWorkSubmitter} that submits {@link SparkWork} objects running them locally via a + * {@link LocalHiveSparkClient}. + */ +class SparkLocalClientWorkSubmitter implements SparkWorkSubmitter { + + @Override + public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception { + JobMetricsListener jobMetricsListener = new JobMetricsListener(); + RemoteDriver.getInstance().sc().sc().addSparkListener(jobMetricsListener); + return new LocalHiveSparkClient(RemoteDriver.getInstance().sc(), jobMetricsListener).execute(driverContext, sparkWork); + } + + @Override + public int close(SparkTask sparkTask, int rc) { + return sparkTask.close(rc); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java index f96a8f77ce..d163953bb8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.SparkWork; @@ -75,4 +76,6 @@ * Get an HDFS dir specific to the SparkSession * */ Path getHDFSSessionDir() throws IOException; -} + + HiveSparkClient getHiveSparkClient(); +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index c8cb1ac08c..c56c114959 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,35 +106,8 @@ public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) thro @Override public ObjectPair getMemoryAndCores() throws Exception { - SparkConf sparkConf = hiveSparkClient.getSparkConf(); - int numExecutors = hiveSparkClient.getExecutorCount(); - // at start-up, we may be unable to get number of executors - if (numExecutors <= 0) { - return new ObjectPair(-1L, -1); - } - int executorMemoryInMB = Utils.memoryStringToMb( - sparkConf.get("spark.executor.memory", "512m")); - double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6); - long totalMemory = (long) (numExecutors * executorMemoryInMB * memoryFraction * 1024 * 1024); - int totalCores; - String masterURL = sparkConf.get("spark.master"); - if (masterURL.startsWith("spark") || masterURL.startsWith("local")) { - totalCores = sparkConf.contains("spark.default.parallelism") ? - sparkConf.getInt("spark.default.parallelism", 1) : - hiveSparkClient.getDefaultParallelism(); - totalCores = Math.max(totalCores, numExecutors); - } else { - int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1); - totalCores = numExecutors * coresPerExecutor; - } - totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1); - - long memoryPerTaskInBytes = totalMemory / totalCores; - LOG.info("Hive on Spark application currently has number of executors: " + numExecutors - + ", total cores: " + totalCores + ", memory per executor: " - + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction); - return new ObjectPair(Long.valueOf(memoryPerTaskInBytes), - Integer.valueOf(totalCores)); + return SparkUtilities.getMemoryAndCores(LOG, hiveSparkClient.getSparkConf(), + hiveSparkClient.getExecutorCount(), hiveSparkClient.getDefaultParallelism()); } @Override @@ -258,12 +232,12 @@ public Path getHDFSSessionDir() throws IOException { return scratchDir; } - public static String makeSessionId() { + private static String makeSessionId() { return UUID.randomUUID().toString(); } - @VisibleForTesting - HiveSparkClient getHiveSparkClient() { + @Override + public HiveSparkClient getHiveSparkClient() { return hiveSparkClient; } -} +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionWorkSubmitter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionWorkSubmitter.java new file mode 100644 index 0000000000..c50b21665e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionWorkSubmitter.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark.session; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.SparkWork; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link SparkWorkSubmitter} that submits {@link SparkWork} object using the + * {@link SparkSession#submit(DriverContext, SparkWork)} method. + */ +class SparkSessionWorkSubmitter implements SparkWorkSubmitter { + + private static final Logger LOG = LoggerFactory.getLogger(SparkSessionWorkSubmitter.class); + + private final HiveConf conf; + + private SparkSessionManager sparkSessionManager; + private SparkSession sparkSession; + + + public SparkSessionWorkSubmitter(HiveConf conf) { + this.conf = conf; + } + + @Override + public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception { + this.sparkSessionManager = SparkSessionManagerImpl.getInstance(); + this.sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager); + return this.sparkSession.submit(driverContext, sparkWork); + } + + @Override + public int close(SparkTask sparkTask, int rc) { + if (sparkSession != null && sparkSessionManager != null) { + rc = sparkTask.close(rc); + try { + sparkSessionManager.returnSession(sparkSession); + } catch (HiveException ex) { + LOG.error("Failed to return the session to SessionManager", ex); + } + } + return rc; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkWorkSubmitter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkWorkSubmitter.java new file mode 100644 index 0000000000..7f941b9511 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkWorkSubmitter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark.session; + +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.plan.SparkWork; + + +/** + * Defines an API for submitting a {@link SparkWork} to be executed as a Spark job. + */ +public interface SparkWorkSubmitter { + + SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception; + + int close(SparkTask sparkTask, int rc); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkWorkSubmitterFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkWorkSubmitterFactory.java new file mode 100644 index 0000000000..8a6d51cef4 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkWorkSubmitterFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark.session; + +import org.apache.hadoop.hive.conf.HiveConf; + + +/** + * Creates {@link SparkWorkSubmitter}s. + */ +public class SparkWorkSubmitterFactory { + + public static SparkWorkSubmitter getSparkWorkSubmitter(HiveConf hiveConf) { + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_CONTAINER_SERVICE)) { + return new SparkLocalClientWorkSubmitter(); + } else { + return new SparkSessionWorkSubmitter(hiveConf); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index ab87c79c48..e02b0cc4cc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -24,7 +24,11 @@ import java.util.Set; import java.util.Stack; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hive.spark.client.RemoteDriver; +import org.apache.spark.SparkConf; +import org.apache.spark.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ObjectPair; @@ -255,27 +259,7 @@ private void getSparkMemoryAndCores(OptimizeSparkProcContext context) throws Sem sparkMemoryAndCores = null; return; } - - SparkSessionManager sparkSessionManager = null; - SparkSession sparkSession = null; - try { - sparkSessionManager = SparkSessionManagerImpl.getInstance(); - sparkSession = SparkUtilities.getSparkSession( - context.getConf(), sparkSessionManager); - sparkMemoryAndCores = sparkSession.getMemoryAndCores(); - } catch (HiveException e) { - throw new SemanticException("Failed to get a Hive on Spark session", e); - } catch (Exception e) { - LOG.warn("Failed to get spark memory/core info, reducer parallelism may be inaccurate", e); - } finally { - if (sparkSession != null && sparkSessionManager != null) { - try { - sparkSessionManager.returnSession(sparkSession); - } catch (HiveException ex) { - LOG.error("Failed to return the session to SessionManager: " + ex, ex); - } - } - } + sparkMemoryAndCores = new SparkMemoryAndCoresFetcherFactory(context.getConf()) + .createSparkMemoryAndCoresFetcher().getSparkMemoryAndCores(); } - -} +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMemoryAndCoresFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMemoryAndCoresFetcher.java new file mode 100644 index 0000000000..97c2f34c3e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMemoryAndCoresFetcher.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.spark; + +import org.apache.hadoop.hive.common.ObjectPair; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +/** + * Defines an API for fetching the memory and cores of the current Spark application associated + * with the current Hive session. + */ +interface SparkMemoryAndCoresFetcher { + + ObjectPair getSparkMemoryAndCores() throws SemanticException; +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMemoryAndCoresFetcherFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMemoryAndCoresFetcherFactory.java new file mode 100644 index 0000000000..5f1aa6ac02 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMemoryAndCoresFetcherFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.spark; + +import org.apache.hadoop.hive.conf.HiveConf; + +/** + * Creates {@link SparkMemoryAndCoresFetcher}s. + */ +public class SparkMemoryAndCoresFetcherFactory { + + private final HiveConf hiveConf; + + SparkMemoryAndCoresFetcherFactory(HiveConf hiveConf) { + this.hiveConf = hiveConf; + } + + public SparkMemoryAndCoresFetcher createSparkMemoryAndCoresFetcher() { + if (this.hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_CONTAINER_SERVICE)) { + return new SparkRemoteDriverMemoryAndCoresFetcher(this.hiveConf); + } else { + return new SparkSessionMemoryAndCoresFetcher(this.hiveConf); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkRemoteDriverMemoryAndCoresFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkRemoteDriverMemoryAndCoresFetcher.java new file mode 100644 index 0000000000..254067954b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkRemoteDriverMemoryAndCoresFetcher.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.spark; + +import org.apache.hadoop.hive.common.ObjectPair; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hive.spark.client.RemoteDriver; + +import org.apache.spark.SparkConf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SparkRemoteDriverMemoryAndCoresFetcher implements SparkMemoryAndCoresFetcher { + + private static final Logger LOG = LoggerFactory.getLogger( + SparkRemoteDriverMemoryAndCoresFetcher.class); + + private final HiveConf hiveConf; + + public SparkRemoteDriverMemoryAndCoresFetcher(HiveConf hiveConf) { + this.hiveConf = hiveConf; + } + + @Override + public ObjectPair getSparkMemoryAndCores() { + int numExecutors = RemoteDriver.getInstance().sc().sc().getExecutorMemoryStatus().size() - 1; + int defaultParallelism = RemoteDriver.getInstance().sc().sc().defaultParallelism(); + SparkConf sparkConf = HiveSparkClientFactory.generateSparkConf( + HiveSparkClientFactory.initiateSparkConf(this.hiveConf, null)); + return SparkUtilities.getMemoryAndCores(LOG, sparkConf, numExecutors, defaultParallelism); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSessionMemoryAndCoresFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSessionMemoryAndCoresFetcher.java new file mode 100644 index 0000000000..86ef17a1dc --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSessionMemoryAndCoresFetcher.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.spark; + +import org.apache.hadoop.hive.common.ObjectPair; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SparkSessionMemoryAndCoresFetcher implements SparkMemoryAndCoresFetcher { + + private static final Logger LOG = LoggerFactory.getLogger( + SparkSessionMemoryAndCoresFetcher.class); + + private final HiveConf hiveConf; + + SparkSessionMemoryAndCoresFetcher(HiveConf hiveConf) { + this.hiveConf = hiveConf; + } + + @Override + public ObjectPair getSparkMemoryAndCores() throws SemanticException { + SparkSessionManager sparkSessionManager = null; + SparkSession sparkSession = null; + try { + sparkSessionManager = SparkSessionManagerImpl.getInstance(); + sparkSession = SparkUtilities.getSparkSession(this.hiveConf, sparkSessionManager); + return sparkSession.getMemoryAndCores(); + } catch (HiveException e) { + throw new SemanticException("Failed to get a Hive on Spark session", e); + } catch (Exception e) { + LOG.warn("Failed to get spark memory/core info, reducer parallelism may be inaccurate", e); + } finally { + if (sparkSession != null && sparkSessionManager != null) { + try { + sparkSessionManager.returnSession(sparkSession); + } catch (HiveException ex) { + LOG.error("Failed to return the session to SessionManager: " + ex, ex); + } + } + } + return null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java index f8b6a97c91..36c6f852f7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java @@ -121,7 +121,11 @@ public static CommandProcessor get(String[] cmd, @Nonnull HiveConf conf) throws if (isBlank(cmd[0])) { return null; } else { - return DriverFactory.newDriver(conf); + try { + return DriverFactory.newRemoteProcessDriver(conf); + } catch (IOException | HiveException e) { + throw new SQLException(e); + } } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 81864f5f67..d026278b17 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hive.metastore.cache.CachedStore; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.RemoteProcessLauncherFactory; import org.apache.hadoop.hive.ql.MapRedStats; import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.exec.Registry; @@ -315,6 +316,29 @@ private List cleanupItems = new LinkedList(); + private final AtomicBoolean launchedRemoteProcess = new AtomicBoolean(); + + /** + * Launch the {@link org.apache.hadoop.hive.ql.RemoteProcessLauncher} if it hasn't been + * launched yet. + */ + public void launchRemoteProcess() throws IOException, HiveException { + if (!launchedRemoteProcess.get()) { + synchronized (launchedRemoteProcess) { + if (!launchedRemoteProcess.get()) { + LOG.debug("Creating remote process launcher"); + + getPerfLogger().PerfLogBegin("RemoteProcessLauncher", "launch"); + RemoteProcessLauncherFactory.getRemoteProcessLauncher(sessionConf).launch(); + getPerfLogger().PerfLogBegin("RemoteProcessLauncher", "launch"); + + LOG.debug("Remote process launcher successfully launched"); + Preconditions.checkState(launchedRemoteProcess.compareAndSet(false, true)); + } + } + } + } + public HiveConf getConf() { return sessionConf; } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 9a07fa1760..4dd0e35cda 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -157,7 +157,7 @@ private void setupSessionIO(SessionState sessionState) { public void prepare(QueryState queryState) throws HiveSQLException { setState(OperationState.RUNNING); try { - driver = DriverFactory.newDriver(queryState, getParentSession().getUserName(), queryInfo); + driver = DriverFactory.newRemoteProcessDriver(queryState, getParentSession().getUserName(), queryInfo); // Start the timer thread for cancelling the query when query timeout is reached // queryTimeout == 0 means no timeout diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java index ed9222cfec..27d6ce45cb 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java @@ -41,13 +41,13 @@ import java.io.Writer; import java.net.URI; import java.net.URL; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -77,7 +77,7 @@ * It uses this protocol to submit {@link Job}s to the {@link RemoteDriver}. *

*/ -abstract class AbstractSparkClient implements SparkClient { +public abstract class AbstractSparkClient implements SparkClient { private static final long serialVersionUID = 1L; @@ -228,6 +228,11 @@ public void cancel(String jobId) { protocol.cancel(jobId); } + @Override + public ClientProtocol getClientProtocol() { + return this.protocol; + } + private Future startDriver(final RpcServer rpcServer, final String clientId, final String secret) throws IOException { final String serverAddress = rpcServer.getAddress(); @@ -306,6 +311,21 @@ public void cancel(String jobId) { } } + String testTmpDir = System.getProperty("test.tmp.dir"); + if (testTmpDir != null) { + addDriverSystemProperty("test.tmp.dir", testTmpDir); + } + + String testTmpDirUri = System.getProperty("test.tmp.dir.uri"); + if (testTmpDirUri != null) { + addDriverSystemProperty("test.tmp.dir.uri", testTmpDirUri); + } + + String testSrcTables = System.getProperty("test.src.tables"); + if (testSrcTables != null) { + addDriverSystemProperty("test.src.tables", testSrcTables); + } + Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8); try { allProps.store(writer, "Spark Context configuration"); @@ -423,7 +443,15 @@ protected abstract void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyT protected abstract void addExecutorCores(String executorCores); - private class ClientProtocol extends BaseProtocol { + protected abstract void addDriverSystemProperty(String key, String value); + + public class ClientProtocol extends BaseProtocol { + + private final Map msgResponses; + + public ClientProtocol() { + this.msgResponses = Maps.newConcurrentMap(); + } JobHandleImpl submit(Job job, List> listeners) { final String jobId = UUID.randomUUID().toString(); @@ -525,6 +553,116 @@ private void handle(ChannelHandlerContext ctx, JobSubmitted msg) { } } + // We define the protocol for the RemoteProcessDriver in the same class because the underlying + // RPC implementation only supports specifying a single RpcDispatcher and it doesn't support + // polymorphism + + public void startSession(byte[] hiveConfBytes) { + LOG.debug("Sending startSession request"); + driverRpc.call(new StartSession(hiveConfBytes)); + } + + public Exception run(String command, byte[] hiveConfBytes, String queryId) { + LOG.debug("Sending run command request for query id " + queryId); + return sendMessage(queryId, new RunCommand(command, hiveConfBytes, queryId)); + } + + public boolean getResults(String queryId, List res) { + LOG.debug("Sending get results request for query id " + queryId); + CommandResults commandResults = sendMessage(queryId, new GetResults(queryId)); + res.addAll(commandResults.res); + return commandResults.moreResults; + } + + public Exception compileAndRespond(String queryId, String command, byte[] hiveConfBytes) { + LOG.debug("Sending run command request for query id " + queryId); + return sendMessage(queryId, new CompileCommand(command, hiveConfBytes, queryId)); + } + + public Exception run(String queryId) { + LOG.debug("Sending run command request for query id " + queryId); + return sendMessage(queryId, new RunCommand(null, null, queryId)); + } + + public boolean hasResultSet(String queryId) { + LOG.debug("Sending hasResultSet request for queryId " + queryId); + return sendMessage(queryId, new HasResultSet(queryId)); + } + + public byte[] getSchema(String queryId) { + LOG.debug("Sending getSchema request for queryId " + queryId); + return sendMessage(queryId, new GetSchema(queryId)); + } + + public boolean isFetchingTable(String queryId) { + LOG.debug("Sending isFetchingTable request for queryId " + queryId); + return sendMessage(queryId, new IsFetchingTable(queryId)); + } + + public void closeDriver(String queryId) { + LOG.debug("Sending closeDriver request for queryId " + queryId); + driverRpc.call(new CloseDriverRequest(queryId)); + } + + public void destroyDriver(String queryId) { + LOG.debug("Sending destroyDriver request for queryId " + queryId); + driverRpc.call(new DestroyDriverRequest(queryId)); + } + + public byte[] getQueryDisplay(String queryId) { + LOG.debug("Sending getQueryDisplay request for queryId " + queryId); + return sendMessage(queryId, new GetQueryDisplayRequest(queryId)); + } + + /** + * Sends a message to the {@link RemoteDriver} via the Driver + * {@link org.apache.hive.spark.client.rpc.Rpc} and blocks until the results are received by + * the corresponding "handle" method. + */ + private T sendMessage(String queryId, Object msg) { + BlockingQueue msgResponse = new ArrayBlockingQueue<>(1); + Preconditions.checkState(msgResponses.putIfAbsent(queryId, msgResponse) == null); + driverRpc.call(msg); + T response; + try { + response = msgResponse.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + msgResponses.remove(queryId); + return response; + } + + private void handle(ChannelHandlerContext ctx, CommandResults msg) { + LOG.debug("Received command results response for query id " + msg.queryId); + handleMessageResponse(msg.queryId, msg); + } + + private void handle(ChannelHandlerContext ctx, CommandProcessorResponseMessage msg) { + LOG.debug("Received command processor response for query id " + msg.queryId); + handleMessageResponse(msg.queryId, msg.commandProcessorResponse); + } + + private void handle(ChannelHandlerContext ctx, HasResultSetResponse msg) { + LOG.debug("Received has result set response for query id " + msg.queryId); + handleMessageResponse(msg.queryId, msg.hasResultSet); + } + + private void handle(ChannelHandlerContext ctx, GetSchemaResponse msg) { + LOG.debug("Received has getSchema response for query id " + msg.queryId); + handleMessageResponse(msg.queryId, msg.schema); + } + + private void handle(ChannelHandlerContext ctx, IsFetchingTableResponse msg) { + LOG.debug("Received has isFetchingTable response for query id " + msg.queryId); + handleMessageResponse(msg.queryId, msg.isFetchingTableResponse); + } + + private void handleMessageResponse(String queryId, Object response) { + Preconditions.checkState(msgResponses.get(queryId).remainingCapacity() == 1); + msgResponses.get(queryId).add(response); + } + @Override protected String name() { return "HiveServer2 to Remote Spark Driver Connection"; diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java index 558ed80ee8..d7e85c614f 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java @@ -18,6 +18,7 @@ package org.apache.hive.spark.client; import java.io.Serializable; +import java.util.List; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.rpc.RpcDispatcher; @@ -222,4 +223,170 @@ public String toString() { '}'; } } + + protected static class RunCommand implements Serializable { + + final String command; + final byte[] hiveConfBytes; + final String queryId; + + RunCommand(String command, byte[] hiveConfBytes, String queryId) { + this.command = command; + this.hiveConfBytes = hiveConfBytes; + this.queryId = queryId; + } + } + + protected static class StartSession implements Serializable { + + final byte[] hiveConfBytes; + + StartSession(byte[] hiveConfBytes) { + this.hiveConfBytes = hiveConfBytes; + } + } + + protected static class HasResultSet implements Serializable { + + final String queryId; + + HasResultSet(String queryId) { + this.queryId = queryId; + } + } + + protected static class GetSchema implements Serializable { + + final String queryId; + + GetSchema(String queryId) { + this.queryId = queryId; + } + } + + protected static class IsFetchingTable implements Serializable { + + final String queryId; + + IsFetchingTable(String queryId) { + this.queryId = queryId; + } + } + + protected static class CloseDriverRequest implements Serializable { + + final String queryId; + + CloseDriverRequest(String queryId) { + this.queryId = queryId; + } + } + + protected static class DestroyDriverRequest implements Serializable { + + final String queryId; + + DestroyDriverRequest(String queryId) { + this.queryId = queryId; + } + } + + protected static class GetQueryDisplayRequest implements Serializable { + + final String queryId; + + GetQueryDisplayRequest(String queryId) { + this.queryId = queryId; + } + } + + protected static class GetSchemaResponse implements Serializable { + + final String queryId; + final byte[] schema; + + GetSchemaResponse(String queryId, byte[] schema) { + this.queryId = queryId; + this.schema = schema; + } + } + + protected static class HasResultSetResponse implements Serializable { + + final String queryId; + final boolean hasResultSet; + + HasResultSetResponse(String queryId, boolean hasResultSet) { + this.queryId = queryId; + this.hasResultSet = hasResultSet; + } + } + + protected static class IsFetchingTableResponse implements Serializable { + + final String queryId; + final boolean isFetchingTableResponse; + + IsFetchingTableResponse(String queryId, boolean isFetchingTableResponse) { + this.queryId = queryId; + this.isFetchingTableResponse = isFetchingTableResponse; + } + } + + protected static class QueryDisplayResponse implements Serializable { + + final String queryId; + final byte[] res; + + QueryDisplayResponse(String queryId, byte[] res) { + this.queryId = queryId; + this.res = res; + } + } + + protected static class CompileCommand implements Serializable { + + final String command; + final byte[] hiveConfBytes; + final String queryId; + + CompileCommand(String command, byte[] hiveConfBytes, String queryId) { + this.command = command; + this.hiveConfBytes = hiveConfBytes; + this.queryId = queryId; + } + } + + protected static class GetResults implements Serializable { + + final String queryId; + + GetResults(String queryId) { + this.queryId = queryId; + } + } + + protected static class CommandResults implements Serializable { + + final List res; + final String queryId; + final boolean moreResults; + + CommandResults(List res, String queryId, boolean moreResults) { + this.res = res; + this.queryId = queryId; + this.moreResults = moreResults; + } + } + + protected static class CommandProcessorResponseMessage implements Serializable { + + final String queryId; + final Exception commandProcessorResponse; + + CommandProcessorResponseMessage(String queryId, Exception commandProcessorResponse) { + this.queryId = queryId; + this.commandProcessorResponse = commandProcessorResponse; + } + } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index 8130860f2b..d3ce0f5359 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -25,6 +25,8 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -71,6 +73,7 @@ public class RemoteDriver { private static final Logger LOG = LoggerFactory.getLogger(RemoteDriver.class); + private static RemoteDriver instance; private final Map> activeJobs; private final Object jcLock; @@ -83,7 +86,7 @@ private final File localTmpDir; // Used to queue up requests while the SparkContext is being created. - private final List> jobQueue = Lists.newLinkedList(); + private final List jobQueue = Lists.newLinkedList(); // jc is effectively final, but it has to be volatile since it's accessed by different // threads while the constructor is running. @@ -185,12 +188,16 @@ public String toString() { } synchronized (jcLock) { - for (Iterator> it = jobQueue.iterator(); it.hasNext();) { + for (Iterator it = jobQueue.iterator(); it.hasNext();) { it.next().submit(); } } } + public static RemoteDriver getInstance() { + return instance; + } + private void addShutdownHook() { Runtime.getRuntime().addShutdownHook(new Thread(() -> { if (running) { @@ -216,7 +223,7 @@ private void run() throws InterruptedException { } } - private void submit(JobWrapper job) { + private void submit(Submittable job) { synchronized (jcLock) { if (jc != null) { job.submit(); @@ -275,8 +282,16 @@ private String getArg(String[] args, int keyIdx) { return args[valIdx]; } + public JavaSparkContext sc() { + return this.jc.sc(); + } + private class DriverProtocol extends BaseProtocol { + private RemoteProcessDriverExecutorFactory remoteProcessDriverExecutorFactory; + private final Map commands = Maps.newConcurrentMap(); + private final ExecutorService es = Executors.newSingleThreadExecutor(); + Future sendError(Throwable error) { LOG.debug("Send error to Client: {}", Throwables.getStackTraceAsString(error)); return clientRpc.call(new Error(Throwables.getStackTraceAsString(error))); @@ -318,6 +333,7 @@ private void handle(ChannelHandlerContext ctx, CancelJob msg) { private void handle(ChannelHandlerContext ctx, EndSession msg) { LOG.debug("Shutting down due to EndSession request."); shutdown(null); + es.shutdownNow(); } private void handle(ChannelHandlerContext ctx, JobRequest msg) { @@ -356,13 +372,160 @@ public void call(JavaFutureAction future, } } + // We define the protocol for the RemoteProcessDriver in the same class because the underlying + // RPC implementation only supports specifying a single RpcDispatcher and it doesn't support + // polymorphism + + private void handle(ChannelHandlerContext ctx, RunCommand msg) { + LOG.debug("Received client run command request for query id " + msg.queryId); + + if (msg.command != null) { + submit(() -> { + es.submit(() -> { + RemoteProcessDriverExecutor remoteProcessDriverExecutor = remoteProcessDriverExecutorFactory.createRemoteProcessDriverExecutor( + msg.hiveConfBytes); + commands.put(msg.queryId, remoteProcessDriverExecutor); + Exception commandProcessorResponse = remoteProcessDriverExecutor.run(msg.command); + clientRpc.call(new CommandProcessorResponseMessage(msg + .queryId, commandProcessorResponse)); + }); + }); + } else { + submit(() -> { + es.submit(() -> { + Exception res = commands.get(msg.queryId).run(); + clientRpc.call(new CommandProcessorResponseMessage(msg + .queryId, res)); + }); + }); + } + } + + private void handle(ChannelHandlerContext ctx, CompileCommand msg) { + LOG.debug("Received client compile command request"); + + submit(() -> { + es.submit(() -> { + RemoteProcessDriverExecutor remoteProcessDriverExecutor = remoteProcessDriverExecutorFactory.createRemoteProcessDriverExecutor( + msg.hiveConfBytes); + commands.put(msg.queryId, remoteProcessDriverExecutor); + Exception commandProcessorResponse = remoteProcessDriverExecutor.compileAndRespond(msg + .command); + clientRpc.call(new CommandProcessorResponseMessage(msg + .queryId, commandProcessorResponse)); + }); + }); + } + + private void handle(ChannelHandlerContext ctx, GetResults msg) { + LOG.debug("Received client get results request"); + + submit(() -> { + es.submit(() -> { + List res = new ArrayList(); + try { + boolean moreResults = commands.get(msg.queryId).getResults(res); + clientRpc.call(new CommandResults(res, msg.queryId, moreResults)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + }); + } + + private void handle(ChannelHandlerContext ctx, HasResultSet msg) { + LOG.debug("Received has result set request for query id " + msg.queryId); + + submit(() -> { + es.submit(() -> { + boolean res = commands.get(msg.queryId).hasResultSet(); + clientRpc.call(new HasResultSetResponse(msg.queryId, res)); + }); + }); + } + + private void handle(ChannelHandlerContext ctx, GetSchema msg) { + LOG.debug("Received has get schema request for query id " + msg.queryId); + + submit(() -> { + es.submit(() -> { + byte[] res = commands.get(msg.queryId).getSchema(); + clientRpc.call(new GetSchemaResponse(msg.queryId, res)); + }); + }); + } + + private void handle(ChannelHandlerContext ctx, IsFetchingTable msg) { + LOG.debug("Received is fetching table request for query id " + msg.queryId); + + submit(() -> { + es.submit(() -> { + boolean res = commands.get(msg.queryId).isFetchingTable(); + clientRpc.call(new IsFetchingTableResponse(msg.queryId, res)); + }); + }); + } + + private void handle(ChannelHandlerContext ctx, CloseDriverRequest msg) { + LOG.debug("Received has close driver request for query id " + msg.queryId); + + submit(() -> { + es.submit(() -> { + commands.get(msg.queryId).close(); + }); + }); + } + + private void handle(ChannelHandlerContext ctx, GetQueryDisplayRequest msg) { + LOG.debug("Received has getQueryDisplay request for query id " + msg.queryId); + + submit(() -> { + es.submit(() -> { + byte[] res = commands.get(msg.queryId).getQueryDisplay(); + clientRpc.call(new QueryDisplayResponse(msg.queryId, res)); + }); + }); + } + + private void handle(ChannelHandlerContext ctx, DestroyDriverRequest msg) { + LOG.debug("Received has destroy driver request for query id " + msg.queryId); + + submit(() -> { + es.submit(() -> { + commands.get(msg.queryId).destroy(); + }); + }); + } + + private void handle(ChannelHandlerContext ctx, StartSession msg) { + LOG.debug("Received start session request"); + + submit(() -> { + es.submit(() -> { + this.remoteProcessDriverExecutorFactory = createRemoteProcessDriverExecutorFactory( + msg.hiveConfBytes); + }); + }); + } + + private RemoteProcessDriverExecutorFactory createRemoteProcessDriverExecutorFactory(byte[] hiveConfBytes) { + try { + return (RemoteProcessDriverExecutorFactory) Class.forName("org.apache.hadoop.hive.ql.exec" + + ".spark.RemoteProcessDriverExecutorFactoryImpl").getConstructor(byte[].class) + .newInstance(hiveConfBytes); + } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | + NoSuchMethodException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + @Override public String name() { return "Remote Spark Driver to HiveServer2 Connection"; } } - private class JobWrapper implements Callable { + private class JobWrapper implements Callable, Submittable { private final BaseProtocol.JobRequest req; private final List> jobs; @@ -443,7 +606,8 @@ public void call(JavaFutureAction future, return null; } - void submit() { + @Override + public void submit() { this.future = executor.submit(this); } @@ -557,6 +721,7 @@ private String getClientId(Integer jobId) { public static void main(String[] args) throws Exception { RemoteDriver rd = new RemoteDriver(args); + RemoteDriver.instance = rd; try { rd.run(); } catch (Exception e) { diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteProcessDriverExecutor.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteProcessDriverExecutor.java new file mode 100644 index 0000000000..6fa38937b5 --- /dev/null +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteProcessDriverExecutor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client; + +import java.io.IOException; +import java.util.List; + +/** + * Executes driver methods inside a remote process. + */ +public interface RemoteProcessDriverExecutor { + + Exception run(String command); + + boolean getResults(List res) throws IOException; + + Exception run(); + + Exception compileAndRespond(String command); + + boolean hasResultSet(); + + byte[] getSchema(); + + boolean isFetchingTable(); + + void close(); + + void destroy(); + + byte[] getQueryDisplay(); +} diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteProcessDriverExecutorFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteProcessDriverExecutorFactory.java new file mode 100644 index 0000000000..0e512da6d2 --- /dev/null +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteProcessDriverExecutorFactory.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client; + +/** + * Creates a new {@link RemoteProcessDriverExecutor} for a given + * {@link org.apache.hadoop.hive.conf.HiveConf}. + */ +public interface RemoteProcessDriverExecutorFactory { + + RemoteProcessDriverExecutor createRemoteProcessDriverExecutor(byte[] hiveConfBytes); +} diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java index 913889951e..2b66744fde 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java @@ -23,6 +23,7 @@ import java.util.concurrent.Future; import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hive.spark.client.rpc.Rpc; /** * Defines the API for the Spark remote client. @@ -117,4 +118,11 @@ * @param jobId the jobId to cancel */ void cancel(String jobId); + + /** + * Returns the client protocol associated with this client. The client protocol is an extension + * of {@link org.apache.hive.spark.client.rpc.RpcDispatcher} and defines the client side + * protocol for interacting with the {@link RemoteDriver}. + */ + AbstractSparkClient.ClientProtocol getClientProtocol(); } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java index d45b77f0b8..1c123514c6 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java @@ -177,6 +177,11 @@ protected void addExecutorCores(String executorCores) { getSparkLauncher().addSparkArg("--executor-cores", executorCores); } + @Override + protected void addDriverSystemProperty(String key, String value) { + // TODO + } + private AbstractLauncher getSparkLauncher() { if (this.sparkLauncher == null) { this.sparkLauncher = new InProcessLauncher(); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java index 1a524b9b69..96f638ab8c 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java @@ -155,6 +155,12 @@ protected void addExecutorCores(String executorCores) { argv.add(executorCores); } + @Override + protected void addDriverSystemProperty(String key, String value) { + argv.add("--driver-java-options"); + argv.add("\"-D" + key + "=" + value + "\""); + } + private String getSparkJobCredentialProviderPassword() { if (conf.containsKey("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD")) { return conf.get("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD"); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/Submittable.java b/spark-client/src/main/java/org/apache/hive/spark/client/Submittable.java new file mode 100644 index 0000000000..e3ae6ce16e --- /dev/null +++ b/spark-client/src/main/java/org/apache/hive/spark/client/Submittable.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client; + +/** + * Defines a function that is "submittable" to some external runner, such as an + * {@link java.util.concurrent.ExecutorService}. + */ +public interface Submittable { + + void submit(); +}