commit f21a3f283ed7cf91d4fa60adeacfb65b2dd5861d
Author: Sahil Takiar
Date: Thu Jun 7 09:40:22 2018 -0500
HIVE-19821: Distributed HiveServer2
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 933bda4ad0..0f0554511b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3017,6 +3017,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal
"The parent node in ZooKeeper used by HiveServer2 when supporting dynamic service discovery."),
HIVE_SERVER2_ZOOKEEPER_PUBLISH_CONFIGS("hive.server2.zookeeper.publish.configs", true,
"Whether we should publish HiveServer2's configs to ZooKeeper."),
+ HIVE_SERVER2_ENABLE_CONTAINER_SERVICE("hive.server2.enable.container.service", false,
+ "Whether to enable running queries for a session in a dedicated remote process."),
// HiveServer2 global init file location
HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION("hive.server2.global.init.file.location", "${env:HIVE_CONF_DIR}",
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSparkAndRemoteProcessDriver.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSparkAndRemoteProcessDriver.java
new file mode 100644
index 0000000000..7c2ba9520a
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSparkAndRemoteProcessDriver.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.session.HiveSessionHook;
+import org.apache.hive.service.cli.session.HiveSessionHookContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This class is cloned from TestJdbcWithMiniMR, except use Spark as the execution engine.
+ *
+ * This is copied from TestJdbcWithLocalClusterSpark with the following modifications:
+ * * Uses a remote metastore
+ * * Sets ObjectStore.setTwoMetastoreTesting(boolean) to true to prevent some "Persistence
+ * Manager has been closed" errors
+ * * Sets hive.security.authorization.manager back to its default value to avoid having to add a
+ * dependency on the itests jars
+ */
+public class TestJdbcWithLocalClusterSparkAndRemoteProcessDriver {
+ public static final String TEST_TAG = "miniHS2.localClusterSpark.tag";
+ public static final String TEST_TAG_VALUE = "miniHS2.localClusterSpark.value";
+ public static class LocalClusterSparkSessionHook implements HiveSessionHook {
+ @Override
+ public void run(HiveSessionHookContext sessionHookContext) throws HiveSQLException {
+ sessionHookContext.getSessionConf().set(TEST_TAG, TEST_TAG_VALUE);
+ }
+ }
+
+ private static MiniHS2 miniHS2 = null;
+ private static HiveConf conf;
+ private static Path dataFilePath;
+ private static String dbName = "mrTestDb";
+ private Connection hs2Conn = null;
+ private Statement stmt;
+
+ private static HiveConf createHiveConf() {
+ HiveConf conf = new HiveConf();
+ conf.set("hive.execution.engine", "spark");
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.set("spark.master", "local-cluster[2,2,1024]");
+ conf.set("hive.spark.client.connect.timeout", "30000ms");
+ // FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout
+ // while spark2 is still using Hadoop2.
+ // Spark requires Hive to support Hadoop3 first then Spark can start
+ // working on Hadoop3 support. Remove this after Spark supports Hadoop3.
+ conf.set("dfs.client.datanode-restart.timeout", "30");
+ conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
+ "TestJdbcWithLocalClusterSpark-local-dir").toString());
+ conf.set("hive.security.authorization.manager", "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+ conf.set("hive.server2.enable.container.service", "true");
+ return conf;
+ }
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ Class.forName(MiniHS2.getJdbcDriverName());
+ conf = createHiveConf();
+ conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ String dataFileDir = conf.get("test.data.files").replace('\\', '/')
+ .replace("c:", "");
+ dataFilePath = new Path(dataFileDir, "kv1.txt");
+ DriverManager.setLoginTimeout(0);
+ conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ miniHS2 = new MiniHS2.Builder().withRemoteMetastore().withConf(conf).withMiniMR().build();
+ Map overlayProps = new HashMap();
+ overlayProps.put(ConfVars.HIVE_SERVER2_SESSION_HOOK.varname,
+ LocalClusterSparkSessionHook.class.getName());
+ miniHS2.start(overlayProps);
+ ObjectStore.setTwoMetastoreTesting(true);
+ createDb();
+ }
+
+ // setup DB
+ private static void createDb() throws Exception {
+ Connection conn = DriverManager.
+ getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+ Statement stmt2 = conn.createStatement();
+ stmt2.execute("DROP DATABASE IF EXISTS " + dbName + " CASCADE");
+ stmt2.execute("CREATE DATABASE " + dbName);
+ stmt2.close();
+ conn.close();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL(dbName),
+ System.getProperty("user.name"), "bar");
+ stmt = hs2Conn.createStatement();
+ stmt.execute("USE " + dbName);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (hs2Conn != null) {
+ hs2Conn.close();
+ }
+ }
+
+ @AfterClass
+ public static void afterTest() throws Exception {
+ if (miniHS2 != null && miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
+ }
+
+ /**
+ * Verify that the connection to HS2 with MiniMr is successful.
+ * @throws Exception
+ */
+ @Test
+ public void testConnection() throws Exception {
+ // the session hook should set the property
+ verifyProperty(TEST_TAG, TEST_TAG_VALUE);
+ }
+
+ /**
+ * Run nonMr query.
+ * @throws Exception
+ */
+ @Test
+ public void testNonSparkQuery() throws Exception {
+ String tableName = "testTab1";
+ String resultVal = "val_238";
+ String queryStr = "SELECT * FROM " + tableName;
+
+ testKvQuery(tableName, queryStr, resultVal);
+ }
+
+ /**
+ * Run nonMr query.
+ * @throws Exception
+ */
+ @Test
+ public void testSparkQuery() throws Exception {
+ String tableName = "testTab2";
+ String resultVal = "val_238";
+ String queryStr = "SELECT * FROM " + tableName
+ + " where value = '" + resultVal + "'";
+
+ testKvQuery(tableName, queryStr, resultVal);
+ }
+
+ @Test
+ public void testPermFunc() throws Exception {
+
+ // This test assumes the hive-contrib JAR has been built as part of the Hive build.
+ // Also dependent on the UDFExampleAdd class within that JAR.
+ String udfClassName = "org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd";
+ String mvnRepo = System.getProperty("maven.local.repository");
+ String hiveVersion = System.getProperty("hive.version");
+ String jarFileName = "hive-contrib-" + hiveVersion + ".jar";
+ String[] pathParts = {
+ "org", "apache", "hive",
+ "hive-contrib", hiveVersion, jarFileName
+ };
+
+ // Create path to hive-contrib JAR on local filesystem
+ Path contribJarPath = new Path(mvnRepo);
+ for (String pathPart : pathParts) {
+ contribJarPath = new Path(contribJarPath, pathPart);
+ }
+ FileSystem localFs = FileSystem.getLocal(conf);
+ assertTrue("Hive contrib JAR exists at " + contribJarPath, localFs.exists(contribJarPath));
+
+ String hdfsJarPathStr = "hdfs:///" + jarFileName;
+ Path hdfsJarPath = new Path(hdfsJarPathStr);
+
+ // Copy JAR to DFS
+ FileSystem dfs = miniHS2.getDFS().getFileSystem();
+ dfs.copyFromLocalFile(contribJarPath, hdfsJarPath);
+ assertTrue("Verify contrib JAR copied to HDFS at " + hdfsJarPath, dfs.exists(hdfsJarPath));
+
+ // Register function
+ String queryStr = "CREATE FUNCTION example_add AS '" + udfClassName + "'"
+ + " USING JAR '" + hdfsJarPathStr + "'";
+ stmt.execute(queryStr);
+
+ // Call describe
+ ResultSet res;
+ res = stmt.executeQuery("DESCRIBE FUNCTION " + dbName + ".example_add");
+ checkForNotExist(res);
+
+ // Use UDF in query
+ String tableName = "testTab3";
+ setupKv1Tabs(tableName);
+ res = stmt.executeQuery("SELECT EXAMPLE_ADD(1, 2) FROM " + tableName + " LIMIT 1");
+ assertTrue("query has results", res.next());
+ assertEquals(3, res.getInt(1));
+ assertFalse("no more results", res.next());
+
+ // A new connection should be able to call describe/use function without issue
+ Connection conn2 = DriverManager.getConnection(miniHS2.getJdbcURL(dbName),
+ System.getProperty("user.name"), "bar");
+ Statement stmt2 = conn2.createStatement();
+ stmt2.execute("USE " + dbName);
+ res = stmt2.executeQuery("DESCRIBE FUNCTION " + dbName + ".example_add");
+ checkForNotExist(res);
+
+ res = stmt2.executeQuery("SELECT " + dbName + ".example_add(1, 1) FROM " + tableName + " LIMIT 1");
+ assertTrue("query has results", res.next());
+ assertEquals(2, res.getInt(1));
+ assertFalse("no more results", res.next());
+
+ stmt.execute("DROP TABLE " + tableName);
+ }
+
+ @Test
+ public void testTempTable() throws Exception {
+ // Create temp table with current connection
+ String tempTableName = "tmp1";
+ stmt.execute("CREATE TEMPORARY TABLE " + tempTableName + " (key string, value string)");
+ stmt.execute("load data local inpath '"
+ + dataFilePath.toString() + "' into table " + tempTableName);
+
+ String resultVal = "val_238";
+ String queryStr = "SELECT * FROM " + tempTableName
+ + " where value = '" + resultVal + "'";
+ verifyResult(queryStr, resultVal, 2);
+
+ // A second connection should not be able to see the table
+ Connection conn2 = DriverManager.getConnection(miniHS2.getJdbcURL(dbName),
+ System.getProperty("user.name"), "bar");
+ Statement stmt2 = conn2.createStatement();
+ stmt2.execute("USE " + dbName);
+ boolean gotException = false;
+ try {
+ stmt2.executeQuery(queryStr);
+ } catch (SQLException err) {
+ // This is expected to fail.
+ assertTrue("Expecting table not found error, instead got: " + err,
+ err.getMessage().contains("Table not found"));
+ gotException = true;
+ }
+ assertTrue("Exception while querying non-existing temp table", gotException);
+ }
+
+ private void checkForNotExist(ResultSet res) throws Exception {
+ int numRows = 0;
+ while (res.next()) {
+ numRows++;
+ String strVal = res.getString(1);
+ assertEquals("Should not find 'not exist'", -1, strVal.toLowerCase().indexOf("not exist"));
+ }
+ assertTrue("Rows returned from describe function", numRows > 0);
+ }
+
+ /**
+ * Verify if the given property contains the expected value.
+ * @param propertyName
+ * @param expectedValue
+ * @throws Exception
+ */
+ private void verifyProperty(String propertyName, String expectedValue) throws Exception {
+ Statement stmt = hs2Conn .createStatement();
+ ResultSet res = stmt.executeQuery("set " + propertyName);
+ assertTrue(res.next());
+ String[] results = res.getString(1).split("=");
+ assertEquals("Property should be set", results.length, 2);
+ assertEquals("Property should be set", expectedValue, results[1]);
+ }
+
+ // create tables, verify query
+ private void testKvQuery(String tableName, String queryStr, String resultVal)
+ throws SQLException {
+ setupKv1Tabs(tableName);
+ verifyResult(queryStr, resultVal, 2);
+ stmt.execute("DROP TABLE " + tableName);
+ }
+
+ // create table and pupulate with kv1.txt
+ private void setupKv1Tabs(String tableName) throws SQLException {
+ Statement stmt = hs2Conn.createStatement();
+ // create table
+ stmt.execute("CREATE TABLE " + tableName
+ + " (under_col INT COMMENT 'the under column', value STRING)"
+ + " COMMENT ' test table'");
+
+ // load data
+ stmt.execute("load data local inpath '"
+ + dataFilePath.toString() + "' into table " + tableName);
+ }
+
+ // run given query and validate expecated result
+ private void verifyResult(String queryStr, String expString, int colPos)
+ throws SQLException {
+ ResultSet res = stmt.executeQuery(queryStr);
+ assertTrue(res.next());
+ assertEquals(expString, res.getString(colPos));
+ res.close();
+ }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSparkAndRemoteProcessDriver.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSparkAndRemoteProcessDriver.java
new file mode 100644
index 0000000000..d757cd0771
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSparkAndRemoteProcessDriver.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.session.HiveSessionHook;
+import org.apache.hive.service.cli.session.HiveSessionHookContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ * This is copied from TestMultiSessionsHS2WithLocalClusterSpark with the following modifications:
+ * * Uses a remote metastore
+ * * Sets ObjectStore.setTwoMetastoreTesting(boolean) to true to prevent some "Persistence
+ * Manager has been closed" errors
+ * * Sets hive.security.authorization.manager back to its default value to avoid having to add a
+ * dependency on the itests jars
+ */
+public class TestMultiSessionsHS2WithLocalClusterSparkAndRemoteProcessDriver {
+ public static final String TEST_TAG = "miniHS2.localClusterSpark.tag";
+ public static final String TEST_TAG_VALUE = "miniHS2.localClusterSpark.value";
+ private static final int PARALLEL_NUMBER = 3;
+
+ public static class LocalClusterSparkSessionHook implements HiveSessionHook {
+ @Override
+ public void run(HiveSessionHookContext sessionHookContext) throws HiveSQLException {
+ sessionHookContext.getSessionConf().set(TEST_TAG, TEST_TAG_VALUE);
+ }
+ }
+
+ private static MiniHS2 miniHS2 = null;
+ private static HiveConf conf;
+ private static Path dataFilePath;
+ private static String dbName = "sparkTestDb";
+ private ThreadLocal localConnection = new ThreadLocal();
+ private ThreadLocal localStatement = new ThreadLocal();
+ private ExecutorService pool = null;
+
+
+ private static HiveConf createHiveConf() {
+ HiveConf conf = new HiveConf();
+ conf.set("hive.exec.parallel", "true");
+ conf.set("hive.execution.engine", "spark");
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.set("spark.master", "local-cluster[2,2,1024]");
+ conf.set("spark.deploy.defaultCores", "2");
+ conf.set("hive.spark.client.connect.timeout", "30000ms");
+ // FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout
+ // while spark2 is still using Hadoop2.
+ // Spark requires Hive to support Hadoop3 first then Spark can start
+ // working on Hadoop3 support. Remove this after Spark supports Hadoop3.
+ conf.set("dfs.client.datanode-restart.timeout", "30");
+ conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
+ "TestMultiSessionsHS2WithLocalClusterSpark-local-dir").toString());
+ conf.set("hive.security.authorization.manager", "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+ conf.set("hive.server2.enable.container.service", "true");
+ return conf;
+ }
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ Class.forName(MiniHS2.getJdbcDriverName());
+ conf = createHiveConf();
+ conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ String dataFileDir = conf.get("test.data.files").replace('\\', '/')
+ .replace("c:", "");
+ dataFilePath = new Path(dataFileDir, "kv1.txt");
+ DriverManager.setLoginTimeout(0);
+ conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ miniHS2 = new MiniHS2.Builder().withRemoteMetastore().withConf(conf).withMiniMR().build();
+ Map overlayProps = new HashMap();
+ overlayProps.put(ConfVars.HIVE_SERVER2_SESSION_HOOK.varname,
+ LocalClusterSparkSessionHook.class.getName());
+ miniHS2.start(overlayProps);
+ ObjectStore.setTwoMetastoreTesting(true);
+ createDb();
+ }
+
+ // setup DB
+ private static void createDb() throws Exception {
+ Connection conn = DriverManager.
+ getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+ Statement stmt2 = conn.createStatement();
+ stmt2.execute("DROP DATABASE IF EXISTS " + dbName + " CASCADE");
+ stmt2.execute("CREATE DATABASE " + dbName);
+ stmt2.close();
+ conn.close();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ pool = Executors.newFixedThreadPool(PARALLEL_NUMBER,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Test-Thread-%d").build());
+ createConnection();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ pool.shutdownNow();
+ closeConnection();
+ }
+
+ private void createConnection() throws Exception {
+ Connection connection = DriverManager.getConnection(miniHS2.getJdbcURL(dbName),
+ System.getProperty("user.name"), "bar");
+ Statement statement = connection.createStatement();
+ localConnection.set(connection);
+ localStatement.set(statement);
+ statement.execute("USE " + dbName);
+ }
+
+ private void closeConnection() throws SQLException {
+ if (localStatement.get() != null) {
+ localStatement.get().close();
+ }
+
+ if (localConnection.get() != null) {
+ localConnection.get().close();
+ }
+ }
+
+ @AfterClass
+ public static void afterTest() throws Exception {
+ if (miniHS2 != null && miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
+ }
+
+ /**
+ * Run nonSpark query
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testNonSparkQuery() throws Exception {
+ String tableName = "kvTable1";
+ setupTable(tableName);
+ Callable runNonSparkQuery = getNonSparkQueryCallable(tableName);
+ runInParallel(runNonSparkQuery);
+ dropTable(tableName);
+ }
+
+ /**
+ * Run spark query
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSparkQuery() throws Exception {
+ String tableName = "kvTable2";
+ setupTable(tableName);
+ Callable runSparkQuery = getSparkQueryCallable(tableName);
+ runInParallel(runSparkQuery);
+ dropTable(tableName);
+ }
+
+ private void runInParallel(Callable runNonSparkQuery) throws InterruptedException, ExecutionException {
+ List futureList = new LinkedList();
+ for (int i = 0; i < PARALLEL_NUMBER; i++) {
+ Future future = pool.submit(runNonSparkQuery);
+ futureList.add(future);
+ }
+
+ for (Future future : futureList) {
+ future.get();
+ }
+ }
+
+ private Callable getNonSparkQueryCallable(final String tableName) {
+ return new Callable() {
+ @Override
+ public Void call() throws Exception {
+ String resultVal = "val_238";
+ String queryStr = "SELECT * FROM " + tableName;
+ testKvQuery(queryStr, resultVal);
+ return null;
+ }
+ };
+ }
+
+ private Callable getSparkQueryCallable(final String tableName) {
+ return new Callable() {
+ @Override
+ public Void call() throws Exception {
+ String resultVal = "val_238";
+ String queryStr = "SELECT * FROM " + tableName +
+ " where value = '" + resultVal + "'";
+ testKvQuery(queryStr, resultVal);
+ return null;
+ }
+ };
+ }
+
+ private void testKvQuery(String queryStr, String resultVal)
+ throws Exception {
+ createConnection();
+ verifyResult(queryStr, resultVal, 2);
+ closeConnection();
+ }
+
+ // create table and load kv1.txt
+ private void setupTable(String tableName) throws SQLException {
+ Statement statement = localStatement.get();
+ // create table
+ statement.execute("CREATE TABLE " + tableName
+ + " (under_col INT COMMENT 'the under column', value STRING)"
+ + " COMMENT ' test table'");
+
+ // load data
+ statement.execute("LOAD DATA LOCAL INPATH '"
+ + dataFilePath.toString() + "' INTO TABLE " + tableName);
+ }
+
+ private void dropTable(String tableName) throws SQLException {
+ localStatement.get().execute("DROP TABLE " + tableName);
+ }
+
+ // run given query and validate expected result
+ private void verifyResult(String queryStr, String expString, int colPos)
+ throws SQLException {
+ ResultSet res = localStatement.get().executeQuery(queryStr);
+ assertTrue(res.next());
+ assertEquals(expString, res.getString(colPos));
+ res.close();
+ }
+}
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 2106fec7af..2454fe8272 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -1214,7 +1214,7 @@ public void init() throws Exception {
db.getConf().set("hive.server2.materializedviews.registry.impl", "DUMMY");
HiveMaterializedViewsRegistry.get().init(db);
db.getConf().set("hive.server2.materializedviews.registry.impl", registryImpl);
- drv = DriverFactory.newDriver(conf);
+ drv = DriverFactory.newRemoteProcessDriver(conf);
pd = new ParseDriver();
sem = new SemanticAnalyzer(queryState);
}
diff --git a/ql/pom.xml b/ql/pom.xml
index 0c181e515c..655f30ab96 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -946,6 +946,21 @@
org.apache.orc:orc-shims
org.apache.orc:orc-tools
joda-time:joda-time
+ org.apache.logging.log4j:log4j-api
+ org.apache.calcite.avatica:avatica
+ org.apache.calcite:calcite-core
+ org.apache.calcite:calcite-linq4j
+ org.apache.calcite:calcite-druid
+ org.apache.thrift:libfb303
+ org.datanucleus:datanucleus-core
+ org.datanucleus:datanucleus-api-jdo
+ org.datanucleus:datanucleus-rdbms
+ org.datanucleus:javax.jdo
+ commons-pool:commons-pool
+ commons-dbcp:commons-dbcp
+ org.antlr:antlr-runtime
+ javax.jdo:jdo-api
+ org.apache.hbase:hbase-common
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java
index 0f6a80ef0d..56d1a15db6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java
@@ -18,10 +18,12 @@
package org.apache.hadoop.hive.ql;
+import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.reexec.IReExecutionPlugin;
import org.apache.hadoop.hive.ql.reexec.ReExecDriver;
import org.apache.hadoop.hive.ql.reexec.ReExecutionOverlayPlugin;
@@ -34,6 +36,30 @@
*/
public class DriverFactory {
+ /**
+ * If {@link HiveConf.ConfVars#HIVE_SERVER2_ENABLE_CONTAINER_SERVICE} is true, return a
+ * {@link RemoteProcessDriver}, else delegate to {@link #newDriver(HiveConf)}.
+ */
+ public static IDriver newRemoteProcessDriver(HiveConf conf) throws IOException, HiveException {
+ if (conf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_CONTAINER_SERVICE)) {
+ return new RemoteProcessDriver(conf);
+ } else {
+ return newDriver(conf);
+ }
+ }
+
+ /**
+ * If {@link HiveConf.ConfVars#HIVE_SERVER2_ENABLE_CONTAINER_SERVICE} is true, return a
+ * {@link RemoteProcessDriver}, else delegate to {@link #newDriver(QueryState, String, QueryInfo)}.
+ */
+ public static IDriver newRemoteProcessDriver(QueryState queryState, String userName, QueryInfo queryInfo) throws IOException, HiveException {
+ if (queryState.getConf().getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_CONTAINER_SERVICE)) {
+ return new RemoteProcessDriver(queryState, userName, queryInfo);
+ } else {
+ return newDriver(queryState, userName, queryInfo);
+ }
+ }
+
public static IDriver newDriver(HiveConf conf) {
return newDriver(getNewQueryState(conf), null, null);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessClient.java b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessClient.java
new file mode 100644
index 0000000000..78b04b196d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessClient.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/**
+ * API that defines how to run {@link IDriver} methods in a remote process. Used by
+ * {@link RemoteProcessDriver}.
+ */
+interface RemoteProcessClient {
+
+ CommandProcessorResponse run(String statement) throws Exception;
+
+ boolean getResults(List res) throws IOException;
+
+ CommandProcessorResponse compileAndRespond(String statement);
+
+ CommandProcessorResponse run();
+
+ boolean hasResultSet();
+
+ Schema getSchema();
+
+ boolean isFetchingTable();
+
+ void close();
+
+ void destroy();
+
+ QueryDisplay getQueryDisplay();
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessClientFactory.java
new file mode 100644
index 0000000000..e1d24c6ab6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessClientFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.RemoteProcessHiveSparkClientImpl;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+/**
+ * Creates a {@link RemoteProcessClient} for a {@link RemoteProcessDriver}.
+ */
+public class RemoteProcessClientFactory {
+
+ public static RemoteProcessClient createRemoteProcessClient(HiveConf hiveConf, String queryId) {
+
+ if ("spark".equals(HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) {
+ SparkSession sparkSession;
+ try {
+ sparkSession = SparkUtilities.getSparkSession(hiveConf,
+ SparkSessionManagerImpl.getInstance());
+ } catch (HiveException e) {
+ throw new RuntimeException(e);
+ }
+ if (!(sparkSession.getHiveSparkClient() instanceof RemoteHiveSparkClient)) {
+ throw new IllegalArgumentException();
+ }
+ RemoteHiveSparkClient remoteHiveSparkClient = (RemoteHiveSparkClient) sparkSession.getHiveSparkClient();
+ return new SparkRemoteProcessClient(queryId, hiveConf, new RemoteProcessHiveSparkClientImpl(queryId, remoteHiveSparkClient.getSparkClient().getClientProtocol()));
+ }
+ throw new IllegalArgumentException();
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessDriver.java
new file mode 100644
index 0000000000..eba396ae8a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessDriver.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/**
+ * Runs a {@link IDriver} in a remote process.
+ */
+public class RemoteProcessDriver implements IDriver {
+
+ private final HiveConf hiveConf;
+ private final QueryState queryState;
+ private final String userName;
+ private final QueryInfo queryInfo;
+ private final RemoteProcessClient remoteProcessClient;
+
+ public RemoteProcessDriver(HiveConf hiveConf) throws IOException, HiveException {
+ this(new QueryState.Builder().withGenerateNewQueryId(true).withHiveConf(hiveConf).build(), null,
+ null);
+ }
+
+ public RemoteProcessDriver(QueryState queryState, String userName, QueryInfo queryInfo) throws IOException, HiveException {
+ this.hiveConf = queryState.getConf();
+ this.queryState = queryState;
+ this.userName = userName;
+ this.queryInfo = queryInfo;
+ this.remoteProcessClient = createRemoteProcessClient(queryState.getConf(), queryState.getQueryId());
+ }
+
+ @Override
+ public int compile(String string) {
+ return 0;
+ }
+
+ @Override
+ public CommandProcessorResponse compileAndRespond(String statement) {
+ return this.remoteProcessClient.compileAndRespond(statement);
+ }
+
+ @Override
+ public QueryPlan getPlan() {
+ return null;
+ }
+
+ @Override
+ public QueryDisplay getQueryDisplay() {
+ return new QueryDisplay() {
+ @Override
+ public synchronized List getTaskDisplays() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public void setOperationId(String guid64) {
+
+ }
+
+ @Override
+ public CommandProcessorResponse run() {
+ return this.remoteProcessClient.run();
+ }
+
+ @Override
+ public CommandProcessorResponse run(String command) {
+ try {
+ return this.remoteProcessClient.run(command);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean getResults(List res) throws IOException {
+ return this.remoteProcessClient.getResults(res);
+ }
+
+ @Override
+ public void setMaxRows(int maxRows) {
+
+ }
+
+ @Override
+ public FetchTask getFetchTask() {
+ return null;
+ }
+
+ @Override
+ public Schema getSchema() {
+ return this.remoteProcessClient.getSchema();
+ }
+
+ @Override
+ public boolean isFetchingTable() {
+ return this.remoteProcessClient.isFetchingTable();
+ }
+
+ @Override
+ public void resetFetch() {
+
+ }
+
+ @Override
+ public void close() {
+ this.remoteProcessClient.close();
+ }
+
+ @Override
+ public void destroy() {
+ this.remoteProcessClient.destroy();
+ }
+
+ @Override
+ public HiveConf getConf() {
+ return null;
+ }
+
+ /**
+ * Don't support getting the {@link Context} because it requires serializing the entire context
+ * object. This method is mostly used for the {@link org.apache.hadoop.hive.ql.reexec.ReExecDriver}
+ * and various unit tests.
+ */
+ @Override
+ public Context getContext() {
+ throw new UnsupportedOperationException(
+ "RemoteProcessDriver does not support getting the Semantic Analyzer Context");
+ }
+
+ @Override
+ public boolean hasResultSet() {
+ return this.remoteProcessClient.hasResultSet();
+ }
+
+ private RemoteProcessClient createRemoteProcessClient(HiveConf hiveConf,
+ String queryId) throws IOException, HiveException {
+ SessionState.get().launchRemoteProcess();
+ return RemoteProcessClientFactory.createRemoteProcessClient(hiveConf, queryId);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessLauncher.java b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessLauncher.java
new file mode 100644
index 0000000000..907859b225
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessLauncher.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+import java.io.IOException;
+
+
+/**
+ * Launches the remote process that will be used by the {@link RemoteProcessDriver}.
+ */
+public interface RemoteProcessLauncher {
+
+ void launch() throws IOException, HiveException;
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessLauncherFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessLauncherFactory.java
new file mode 100644
index 0000000000..0661821b2c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/RemoteProcessLauncherFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+
+/**
+ * Creates a {@link RemoteProcessLauncher}.
+ */
+public class RemoteProcessLauncherFactory {
+
+ public static RemoteProcessLauncher getRemoteProcessLauncher(HiveConf hiveConf) {
+ if ("spark".equals(HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) {
+ return new SparkRemoteProcessLauncher(hiveConf);
+ }
+ throw new IllegalArgumentException();
+ }
+}
\ No newline at end of file
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/SparkRemoteProcessClient.java b/ql/src/java/org/apache/hadoop/hive/ql/SparkRemoteProcessClient.java
new file mode 100644
index 0000000000..7259038768
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/SparkRemoteProcessClient.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.exec.spark.KryoSerializer;
+import org.apache.hadoop.hive.ql.exec.spark.RemoteProcessHiveSparkClient;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/**
+ * A {@link RemoteProcessClient} that uses a Spark driver to run all remote driver operations. It
+ * uses a {@link RemoteProcessHiveSparkClient} to interact with the remote driver.
+ */
+class SparkRemoteProcessClient implements RemoteProcessClient {
+
+ private final String queryId;
+ private final HiveConf hiveConf;
+ private final RemoteProcessHiveSparkClient remoteProcessHiveSparkClient;
+ private PerfLogger perfLogger;
+
+ SparkRemoteProcessClient(String queryId, HiveConf hiveConf,
+ RemoteProcessHiveSparkClient remoteProcessHiveSparkClient) {
+ this.queryId = queryId;
+ this.hiveConf = hiveConf;
+ this.remoteProcessHiveSparkClient = remoteProcessHiveSparkClient;
+ this.perfLogger = SessionState.getPerfLogger();
+ }
+
+ @Override
+ public CommandProcessorResponse run(String statement) {
+ this.perfLogger.PerfLogBegin(getClass().getSimpleName(), "serializeHiveConf");
+ byte[] hiveConfBytes = KryoSerializer.serializeHiveConf(hiveConf);
+ this.perfLogger.PerfLogEnd(getClass().getSimpleName(), "serializeHiveConf");
+
+ return this.remoteProcessHiveSparkClient.run(statement, hiveConfBytes);
+ }
+
+ @Override
+ public boolean getResults(List res) throws IOException {
+ return this.remoteProcessHiveSparkClient.getResults(res);
+ }
+
+ @Override
+ public CommandProcessorResponse compileAndRespond(String statement) {
+ this.perfLogger.PerfLogBegin(getClass().getSimpleName(), "serializeHiveConf");
+ byte[] hiveConfBytes = KryoSerializer.serializeHiveConf(hiveConf);
+ this.perfLogger.PerfLogEnd(getClass().getSimpleName(), "serializeHiveConf");
+ return this.remoteProcessHiveSparkClient.compileAndRespond(statement, hiveConfBytes);
+ }
+
+ @Override
+ public CommandProcessorResponse run() {
+ return this.remoteProcessHiveSparkClient.run();
+ }
+
+ @Override
+ public boolean hasResultSet() {
+ return this.remoteProcessHiveSparkClient.hasResultSet();
+ }
+
+ @Override
+ public Schema getSchema() {
+ return this.remoteProcessHiveSparkClient.getSchema();
+ }
+
+ @Override
+ public boolean isFetchingTable() {
+ return this.remoteProcessHiveSparkClient.isFetchingTable();
+ }
+
+ @Override
+ public void close() {
+ this.remoteProcessHiveSparkClient.close();
+ }
+
+ @Override
+ public void destroy() {
+ this.remoteProcessHiveSparkClient.destroy();
+ }
+
+ @Override
+ public QueryDisplay getQueryDisplay() {
+ return this.remoteProcessHiveSparkClient.getQueryDisplay();
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/SparkRemoteProcessLauncher.java b/ql/src/java/org/apache/hadoop/hive/ql/SparkRemoteProcessLauncher.java
new file mode 100644
index 0000000000..8dddc9785b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/SparkRemoteProcessLauncher.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.spark.KryoSerializer;
+import org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+
+/**
+ * A {@link RemoteProcessLauncher} that defines the remote process as the Spark driver. It uses
+ * the existing {@link SparkSession} and {@link org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager}
+ * logic to launch the {@link org.apache.hive.spark.client.RemoteDriver}.
+ */
+class SparkRemoteProcessLauncher implements RemoteProcessLauncher {
+
+ private final HiveConf hiveConf;
+
+ SparkRemoteProcessLauncher(HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+ }
+
+ @Override
+ public void launch() throws HiveException {
+ SparkSession ss = SparkUtilities.getSparkSession(hiveConf, SparkSessionManagerImpl.getInstance());
+ byte[] hiveConfBytes = KryoSerializer.serializeHiveConf(hiveConf);
+ ((RemoteHiveSparkClient) ss.getHiveSparkClient()).getSparkClient().getClientProtocol().startSession(hiveConfBytes);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 5ed5d4214e..e7d920aa43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -251,7 +251,7 @@ private static void addCredentialProviderPassword(Map sparkConf,
sparkConf.put("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD", jobCredstorePassword);
}
- static SparkConf generateSparkConf(Map conf) {
+ public static SparkConf generateSparkConf(Map conf) {
SparkConf sparkConf = new SparkConf(false);
for (Map.Entry entry : conf.entrySet()) {
sparkConf.set(entry.getKey(), entry.getValue());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
index f77e92cd2a..446f746e42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
@@ -24,6 +24,7 @@
import java.io.IOException;
import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
@@ -64,6 +65,25 @@
return result;
}
+ public static byte[] serializeHiveConf(HiveConf hiveConf) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try {
+ hiveConf.write(new DataOutputStream(out));
+ } catch (IOException e) {
+ LOG.error("Error serializing job configuration: " + e, e);
+ return null;
+ } finally {
+ try {
+ out.close();
+ } catch (IOException e) {
+ LOG.error("Error closing output stream: " + e, e);
+ }
+ }
+
+ return out.toByteArray();
+
+ }
+
public static byte[] serializeJobConf(JobConf jobConf) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
@@ -94,4 +114,14 @@ public static JobConf deserializeJobConf(byte[] buffer) {
return conf;
}
+ public static HiveConf deserializeHiveConf(byte[] buffer) {
+ HiveConf conf = new HiveConf();
+ try {
+ conf.readFields(new DataInputStream(new ByteArrayInputStream(buffer)));
+ } catch (IOException e) {
+ String msg = "Error de-serializing job configuration: " + e;
+ throw new IllegalStateException(msg, e);
+ }
+ return conf;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
index 72ff53e3bd..b04dc6d35f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
@@ -26,6 +26,7 @@
import java.util.Map;
import org.apache.hadoop.hive.ql.exec.DagUtils;
+import org.apache.hive.spark.client.SparkClient;
import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.spark.util.CallSite;
import org.slf4j.Logger;
@@ -88,6 +89,11 @@ public static synchronized LocalHiveSparkClient getInstance(
private final JobMetricsListener jobMetricsListener;
+ public LocalHiveSparkClient(JavaSparkContext sc, JobMetricsListener jobMetricsListener) {
+ this.sc = sc;
+ this.jobMetricsListener = jobMetricsListener;
+ }
+
private LocalHiveSparkClient(SparkConf sparkConf, HiveConf hiveConf)
throws FileNotFoundException, MalformedURLException {
String regJar = null;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index d31a202261..e990d50322 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -180,6 +180,10 @@ public int getDefaultParallelism() throws Exception {
return handler.get(sparkClientTimtout, TimeUnit.SECONDS);
}
+ public SparkClient getSparkClient() {
+ return remoteClient;
+ }
+
@Override
public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork)
throws Exception {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessDriverExecutorFactoryImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessDriverExecutorFactoryImpl.java
new file mode 100644
index 0000000000..e92d23891b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessDriverExecutorFactoryImpl.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.spark.client.RemoteProcessDriverExecutor;
+import org.apache.hive.spark.client.RemoteProcessDriverExecutorFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/**
+ * A simple of implementation of {@link RemoteProcessDriverExecutorFactory} that creates
+ * {@link RemoteProcessDriverExecutor}s.
+ */
+public class RemoteProcessDriverExecutorFactoryImpl implements RemoteProcessDriverExecutorFactory {
+
+ private final HiveConf hiveConf;
+
+ public RemoteProcessDriverExecutorFactoryImpl(byte[] hiveConfBytes) {
+ this.hiveConf = KryoSerializer.deserializeHiveConf(hiveConfBytes);
+ SessionState.start(this.hiveConf);
+ }
+
+ @Override
+ public RemoteProcessDriverExecutor createRemoteProcessDriverExecutor(byte[] hiveConfBytes) {
+ return new RemoteProcessDriverExecutorImpl(hiveConfBytes);
+ }
+
+ /**
+ * A simple implementation of {@link RemoteProcessDriverExecutor} that delegates to the
+ * {@link IDriver} defined by {@link DriverFactory#newDriver(HiveConf)}.
+ */
+ private static final class RemoteProcessDriverExecutorImpl implements RemoteProcessDriverExecutor {
+
+ private HiveConf hiveConf;
+ private IDriver driver;
+
+ private RemoteProcessDriverExecutorImpl(byte[] hiveConfBytes) {
+ this.hiveConf = KryoSerializer.deserializeHiveConf(hiveConfBytes);
+ this.driver = DriverFactory.newDriver(this.hiveConf);
+ }
+
+ @Override
+ public Exception run(String command) {
+ return this.driver.run(command);
+ }
+
+ @Override
+ public boolean getResults(List res) throws IOException {
+ return this.driver.getResults(res);
+ }
+
+ @Override
+ public Exception run() {
+ return this.driver.run();
+ }
+
+ @Override
+ public Exception compileAndRespond(String command) {
+ return this.driver.compileAndRespond(command);
+ }
+
+ @Override
+ public boolean hasResultSet() {
+ return this.driver.hasResultSet();
+ }
+
+ @Override
+ public byte[] getSchema() {
+ return KryoSerializer.serialize(this.driver.getSchema());
+ }
+
+ @Override
+ public boolean isFetchingTable() {
+ return this.driver.isFetchingTable();
+ }
+
+ @Override
+ public void close() {
+ this.driver.close();
+ }
+
+ @Override
+ public void destroy() {
+ this.driver.destroy();
+ }
+
+ @Override
+ public byte[] getQueryDisplay() {
+ return KryoSerializer.serialize(this.driver.getQueryDisplay());
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessHiveSparkClient.java
new file mode 100644
index 0000000000..5f477b58a8
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessHiveSparkClient.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.QueryDisplay;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+
+/**
+ * Similar to {@link HiveSparkClient} except it defines client methods to be used by
+ * {@link org.apache.hadoop.hive.ql.SparkRemoteProcessClient}.
+ */
+public interface RemoteProcessHiveSparkClient extends Serializable {
+
+ CommandProcessorResponse run(String command, byte[] hiveConfBytes);
+
+ boolean getResults(List res) throws IOException;
+
+ CommandProcessorResponse compileAndRespond(String statement, byte[] hiveConfBytes);
+
+ CommandProcessorResponse run();
+
+ boolean hasResultSet();
+
+ Schema getSchema();
+
+ boolean isFetchingTable();
+
+ void close();
+
+ void destroy();
+
+ QueryDisplay getQueryDisplay();
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessHiveSparkClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessHiveSparkClientImpl.java
new file mode 100644
index 0000000000..b1279e7ba6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteProcessHiveSparkClientImpl.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.QueryDisplay;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hive.spark.client.AbstractSparkClient;
+
+import java.util.List;
+
+
+/**
+ * A {@link RemoteProcessHiveSparkClient} that uses {@link AbstractSparkClient.ClientProtocol} to
+ * run the necessary driver commands.
+ */
+public class RemoteProcessHiveSparkClientImpl implements RemoteProcessHiveSparkClient {
+
+
+ private final String queryId;
+ private final AbstractSparkClient.ClientProtocol clientProtocol;
+
+ public RemoteProcessHiveSparkClientImpl(String queryId,
+ AbstractSparkClient.ClientProtocol clientProtocol) {
+ this.queryId = queryId;
+ this.clientProtocol = clientProtocol;
+ }
+
+ @Override
+ public CommandProcessorResponse run(String command, byte[] hiveConfBytes) {
+ return (CommandProcessorResponse) this.clientProtocol.run(command, hiveConfBytes, this.queryId);
+ }
+
+ @Override
+ public boolean getResults(List res) {
+ return this.clientProtocol.getResults(this.queryId, res);
+ }
+
+ @Override
+ public CommandProcessorResponse compileAndRespond(String statement, byte[] hiveConfBytes) {
+ return (CommandProcessorResponse) this.clientProtocol.compileAndRespond(this.queryId, statement,
+ hiveConfBytes);
+ }
+
+ @Override
+ public CommandProcessorResponse run() {
+ return (CommandProcessorResponse) this.clientProtocol.run(this.queryId);
+ }
+
+ @Override
+ public boolean hasResultSet() {
+ return this.clientProtocol.hasResultSet(this.queryId);
+ }
+
+ @Override
+ public Schema getSchema() {
+ return KryoSerializer.deserialize(this.clientProtocol.getSchema(this.queryId), Schema.class);
+ }
+
+ @Override
+ public boolean isFetchingTable() {
+ return this.clientProtocol.isFetchingTable(this.queryId);
+ }
+
+ @Override
+ public void close() {
+ this.clientProtocol.closeDriver(this.queryId);
+ }
+
+ @Override
+ public void destroy() {
+ this.clientProtocol.destroyDriver(this.queryId);
+ }
+
+ @Override
+ public QueryDisplay getQueryDisplay() {
+ return KryoSerializer.deserialize(this.clientProtocol.getQueryDisplay(this.queryId),
+ QueryDisplay.class);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 02613f2ca3..117c6bcf9f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -35,6 +35,8 @@
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkWorkSubmitter;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkWorkSubmitterFactory;
import org.apache.hadoop.hive.ql.exec.spark.status.impl.SparkMetricsUtils;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkStage;
@@ -112,12 +114,9 @@ public int execute(DriverContext driverContext) {
int rc = 0;
perfLogger = SessionState.getPerfLogger();
- SparkSession sparkSession = null;
- SparkSessionManager sparkSessionManager = null;
+ SparkWorkSubmitter workSubmitter = null;
try {
printConfigInfo();
- sparkSessionManager = SparkSessionManagerImpl.getInstance();
- sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager);
SparkWork sparkWork = getWork();
sparkWork.setRequiredCounterPrefix(getOperatorCounters());
@@ -125,7 +124,8 @@ public int execute(DriverContext driverContext) {
// Submit the Spark job
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB);
- jobRef = sparkSession.submit(driverContext, sparkWork);
+ workSubmitter = SparkWorkSubmitterFactory.getSparkWorkSubmitter(conf);
+ jobRef = workSubmitter.submit(driverContext, sparkWork);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
// If the driver context has been shutdown (due to query cancellation) kill the Spark job
@@ -213,13 +213,8 @@ public int execute(DriverContext driverContext) {
}
finishTime = perfLogger.getEndTime(PerfLogger.SPARK_RUN_JOB);
Utilities.clearWork(conf);
- if (sparkSession != null && sparkSessionManager != null) {
- rc = close(rc);
- try {
- sparkSessionManager.returnSession(sparkSession);
- } catch (HiveException ex) {
- LOG.error("Failed to return the session to SessionManager", ex);
- }
+ if (workSubmitter != null) {
+ rc = workSubmitter.close(this, rc);
}
}
return rc;
@@ -355,7 +350,7 @@ static String sparkStatisticsToString(SparkStatistics sparkStatistic, int sparkJ
* Close will move the temp files into the right place for the fetch
* task. If the job has failed it will clean up the files.
*/
- private int close(int rc) {
+ public int close(int rc) {
try {
List ws = work.getAllWork();
for (BaseWork w: ws) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index fdc5361989..fc0f3272de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -55,6 +56,8 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.spark.SparkConf;
+import org.apache.spark.util.Utils;
+import org.slf4j.Logger;
/**
@@ -339,4 +342,35 @@ public static String reverseDNSLookupURL(String url) throws UnknownHostException
InetAddress address = InetAddress.getByName(uri.getHost());
return uri.getScheme() + "://" + address.getCanonicalHostName() + ":" + uri.getPort();
}
+
+ public static ObjectPair getMemoryAndCores(Logger log, SparkConf sparkConf, int
+ numExecutors, int defaultParallelism) {
+ // at start-up, we may be unable to get number of executors
+ if (numExecutors <= 0) {
+ return new ObjectPair(-1L, -1);
+ }
+ int executorMemoryInMB = Utils.memoryStringToMb(
+ sparkConf.get("spark.executor.memory", "512m"));
+ double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6);
+ long totalMemory = (long) (numExecutors * executorMemoryInMB * memoryFraction * 1024 * 1024);
+ int totalCores;
+ String masterURL = sparkConf.get("spark.master");
+ if (masterURL.startsWith("spark") || masterURL.startsWith("local")) {
+ totalCores = sparkConf.contains("spark.default.parallelism") ?
+ sparkConf.getInt("spark.default.parallelism", 1) :
+ defaultParallelism;
+ totalCores = Math.max(totalCores, numExecutors);
+ } else {
+ int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1);
+ totalCores = numExecutors * coresPerExecutor;
+ }
+ totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1);
+
+ long memoryPerTaskInBytes = totalMemory / totalCores;
+ log.info("Hive on Spark application currently has number of executors: " + numExecutors
+ + ", total cores: " + totalCores + ", memory per executor: "
+ + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction);
+ return new ObjectPair(Long.valueOf(memoryPerTaskInBytes),
+ Integer.valueOf(totalCores));
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkLocalClientWorkSubmitter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkLocalClientWorkSubmitter.java
new file mode 100644
index 0000000000..5f2460d98b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkLocalClientWorkSubmitter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.session;
+
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.spark.LocalHiveSparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+
+import org.apache.hive.spark.client.RemoteDriver;
+
+
+/**
+ * A {@link SparkWorkSubmitter} that submits {@link SparkWork} objects running them locally via a
+ * {@link LocalHiveSparkClient}.
+ */
+class SparkLocalClientWorkSubmitter implements SparkWorkSubmitter {
+
+ @Override
+ public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception {
+ JobMetricsListener jobMetricsListener = new JobMetricsListener();
+ RemoteDriver.getInstance().sc().sc().addSparkListener(jobMetricsListener);
+ return new LocalHiveSparkClient(RemoteDriver.getInstance().sc(), jobMetricsListener).execute(driverContext, sparkWork);
+ }
+
+ @Override
+ public int close(SparkTask sparkTask, int rc) {
+ return sparkTask.close(rc);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
index f96a8f77ce..d163953bb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.SparkWork;
@@ -75,4 +76,6 @@
* Get an HDFS dir specific to the SparkSession
* */
Path getHDFSSessionDir() throws IOException;
-}
+
+ HiveSparkClient getHiveSparkClient();
+}
\ No newline at end of file
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index c8cb1ac08c..c56c114959 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,35 +106,8 @@ public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) thro
@Override
public ObjectPair getMemoryAndCores() throws Exception {
- SparkConf sparkConf = hiveSparkClient.getSparkConf();
- int numExecutors = hiveSparkClient.getExecutorCount();
- // at start-up, we may be unable to get number of executors
- if (numExecutors <= 0) {
- return new ObjectPair(-1L, -1);
- }
- int executorMemoryInMB = Utils.memoryStringToMb(
- sparkConf.get("spark.executor.memory", "512m"));
- double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6);
- long totalMemory = (long) (numExecutors * executorMemoryInMB * memoryFraction * 1024 * 1024);
- int totalCores;
- String masterURL = sparkConf.get("spark.master");
- if (masterURL.startsWith("spark") || masterURL.startsWith("local")) {
- totalCores = sparkConf.contains("spark.default.parallelism") ?
- sparkConf.getInt("spark.default.parallelism", 1) :
- hiveSparkClient.getDefaultParallelism();
- totalCores = Math.max(totalCores, numExecutors);
- } else {
- int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1);
- totalCores = numExecutors * coresPerExecutor;
- }
- totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1);
-
- long memoryPerTaskInBytes = totalMemory / totalCores;
- LOG.info("Hive on Spark application currently has number of executors: " + numExecutors
- + ", total cores: " + totalCores + ", memory per executor: "
- + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction);
- return new ObjectPair(Long.valueOf(memoryPerTaskInBytes),
- Integer.valueOf(totalCores));
+ return SparkUtilities.getMemoryAndCores(LOG, hiveSparkClient.getSparkConf(),
+ hiveSparkClient.getExecutorCount(), hiveSparkClient.getDefaultParallelism());
}
@Override
@@ -258,12 +232,12 @@ public Path getHDFSSessionDir() throws IOException {
return scratchDir;
}
- public static String makeSessionId() {
+ private static String makeSessionId() {
return UUID.randomUUID().toString();
}
- @VisibleForTesting
- HiveSparkClient getHiveSparkClient() {
+ @Override
+ public HiveSparkClient getHiveSparkClient() {
return hiveSparkClient;
}
-}
+}
\ No newline at end of file
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionWorkSubmitter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionWorkSubmitter.java
new file mode 100644
index 0000000000..cdd034138f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionWorkSubmitter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark.session;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link SparkWorkSubmitter} that submits {@link SparkWork} object using the
+ * {@link SparkSession#submit(DriverContext, SparkWork)} method.
+ */
+class SparkSessionWorkSubmitter implements SparkWorkSubmitter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkSessionWorkSubmitter.class);
+
+ private final HiveConf conf;
+
+ private SparkSessionManager sparkSessionManager;
+ private SparkSession sparkSession;
+
+
+ SparkSessionWorkSubmitter(HiveConf conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception {
+ this.sparkSessionManager = SparkSessionManagerImpl.getInstance();
+ this.sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager);
+ return this.sparkSession.submit(driverContext, sparkWork);
+ }
+
+ @Override
+ public int close(SparkTask sparkTask, int rc) {
+ if (sparkSession != null && sparkSessionManager != null) {
+ rc = sparkTask.close(rc);
+ try {
+ sparkSessionManager.returnSession(sparkSession);
+ } catch (HiveException ex) {
+ LOG.error("Failed to return the session to SessionManager", ex);
+ }
+ }
+ return rc;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkWorkSubmitter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkWorkSubmitter.java
new file mode 100644
index 0000000000..7f941b9511
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkWorkSubmitter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark.session;
+
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+
+
+/**
+ * Defines an API for submitting a {@link SparkWork} to be executed as a Spark job.
+ */
+public interface SparkWorkSubmitter {
+
+ SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception;
+
+ int close(SparkTask sparkTask, int rc);
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkWorkSubmitterFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkWorkSubmitterFactory.java
new file mode 100644
index 0000000000..a75ffad8ec
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkWorkSubmitterFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.session;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+
+/**
+ * Creates {@link SparkWorkSubmitter}s.
+ */
+public class SparkWorkSubmitterFactory {
+
+ public static SparkWorkSubmitter getSparkWorkSubmitter(HiveConf hiveConf) {
+ if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_CONTAINER_SERVICE)) {
+ return new SparkLocalClientWorkSubmitter();
+ } else {
+ return new SparkSessionWorkSubmitter(hiveConf);
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
index ab87c79c48..e02b0cc4cc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
@@ -24,7 +24,11 @@
import java.util.Set;
import java.util.Stack;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hive.spark.client.RemoteDriver;
+import org.apache.spark.SparkConf;
+import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.ObjectPair;
@@ -255,27 +259,7 @@ private void getSparkMemoryAndCores(OptimizeSparkProcContext context) throws Sem
sparkMemoryAndCores = null;
return;
}
-
- SparkSessionManager sparkSessionManager = null;
- SparkSession sparkSession = null;
- try {
- sparkSessionManager = SparkSessionManagerImpl.getInstance();
- sparkSession = SparkUtilities.getSparkSession(
- context.getConf(), sparkSessionManager);
- sparkMemoryAndCores = sparkSession.getMemoryAndCores();
- } catch (HiveException e) {
- throw new SemanticException("Failed to get a Hive on Spark session", e);
- } catch (Exception e) {
- LOG.warn("Failed to get spark memory/core info, reducer parallelism may be inaccurate", e);
- } finally {
- if (sparkSession != null && sparkSessionManager != null) {
- try {
- sparkSessionManager.returnSession(sparkSession);
- } catch (HiveException ex) {
- LOG.error("Failed to return the session to SessionManager: " + ex, ex);
- }
- }
- }
+ sparkMemoryAndCores = new SparkMemoryAndCoresFetcherFactory(context.getConf())
+ .createSparkMemoryAndCoresFetcher().getSparkMemoryAndCores();
}
-
-}
+}
\ No newline at end of file
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMemoryAndCoresFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMemoryAndCoresFetcher.java
new file mode 100644
index 0000000000..97c2f34c3e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMemoryAndCoresFetcher.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.spark;
+
+import org.apache.hadoop.hive.common.ObjectPair;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * Defines an API for fetching the memory and cores of the current Spark application associated
+ * with the current Hive session.
+ */
+interface SparkMemoryAndCoresFetcher {
+
+ ObjectPair getSparkMemoryAndCores() throws SemanticException;
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMemoryAndCoresFetcherFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMemoryAndCoresFetcherFactory.java
new file mode 100644
index 0000000000..5f1aa6ac02
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMemoryAndCoresFetcherFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.spark;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/**
+ * Creates {@link SparkMemoryAndCoresFetcher}s.
+ */
+public class SparkMemoryAndCoresFetcherFactory {
+
+ private final HiveConf hiveConf;
+
+ SparkMemoryAndCoresFetcherFactory(HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+ }
+
+ public SparkMemoryAndCoresFetcher createSparkMemoryAndCoresFetcher() {
+ if (this.hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_CONTAINER_SERVICE)) {
+ return new SparkRemoteDriverMemoryAndCoresFetcher(this.hiveConf);
+ } else {
+ return new SparkSessionMemoryAndCoresFetcher(this.hiveConf);
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkRemoteDriverMemoryAndCoresFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkRemoteDriverMemoryAndCoresFetcher.java
new file mode 100644
index 0000000000..254067954b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkRemoteDriverMemoryAndCoresFetcher.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.spark;
+
+import org.apache.hadoop.hive.common.ObjectPair;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
+import org.apache.hive.spark.client.RemoteDriver;
+
+import org.apache.spark.SparkConf;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SparkRemoteDriverMemoryAndCoresFetcher implements SparkMemoryAndCoresFetcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ SparkRemoteDriverMemoryAndCoresFetcher.class);
+
+ private final HiveConf hiveConf;
+
+ public SparkRemoteDriverMemoryAndCoresFetcher(HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+ }
+
+ @Override
+ public ObjectPair getSparkMemoryAndCores() {
+ int numExecutors = RemoteDriver.getInstance().sc().sc().getExecutorMemoryStatus().size() - 1;
+ int defaultParallelism = RemoteDriver.getInstance().sc().sc().defaultParallelism();
+ SparkConf sparkConf = HiveSparkClientFactory.generateSparkConf(
+ HiveSparkClientFactory.initiateSparkConf(this.hiveConf, null));
+ return SparkUtilities.getMemoryAndCores(LOG, sparkConf, numExecutors, defaultParallelism);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSessionMemoryAndCoresFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSessionMemoryAndCoresFetcher.java
new file mode 100644
index 0000000000..86ef17a1dc
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSessionMemoryAndCoresFetcher.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.spark;
+
+import org.apache.hadoop.hive.common.ObjectPair;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SparkSessionMemoryAndCoresFetcher implements SparkMemoryAndCoresFetcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ SparkSessionMemoryAndCoresFetcher.class);
+
+ private final HiveConf hiveConf;
+
+ SparkSessionMemoryAndCoresFetcher(HiveConf hiveConf) {
+ this.hiveConf = hiveConf;
+ }
+
+ @Override
+ public ObjectPair getSparkMemoryAndCores() throws SemanticException {
+ SparkSessionManager sparkSessionManager = null;
+ SparkSession sparkSession = null;
+ try {
+ sparkSessionManager = SparkSessionManagerImpl.getInstance();
+ sparkSession = SparkUtilities.getSparkSession(this.hiveConf, sparkSessionManager);
+ return sparkSession.getMemoryAndCores();
+ } catch (HiveException e) {
+ throw new SemanticException("Failed to get a Hive on Spark session", e);
+ } catch (Exception e) {
+ LOG.warn("Failed to get spark memory/core info, reducer parallelism may be inaccurate", e);
+ } finally {
+ if (sparkSession != null && sparkSessionManager != null) {
+ try {
+ sparkSessionManager.returnSession(sparkSession);
+ } catch (HiveException ex) {
+ LOG.error("Failed to return the session to SessionManager: " + ex, ex);
+ }
+ }
+ }
+ return null;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
index f8b6a97c91..36c6f852f7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
@@ -121,7 +121,11 @@ public static CommandProcessor get(String[] cmd, @Nonnull HiveConf conf) throws
if (isBlank(cmd[0])) {
return null;
} else {
- return DriverFactory.newDriver(conf);
+ try {
+ return DriverFactory.newRemoteProcessDriver(conf);
+ } catch (IOException | HiveException e) {
+ throw new SQLException(e);
+ }
}
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 81864f5f67..d026278b17 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -65,6 +65,7 @@
import org.apache.hadoop.hive.metastore.cache.CachedStore;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.RemoteProcessLauncherFactory;
import org.apache.hadoop.hive.ql.MapRedStats;
import org.apache.hadoop.hive.ql.exec.FunctionInfo;
import org.apache.hadoop.hive.ql.exec.Registry;
@@ -315,6 +316,29 @@
private List cleanupItems = new LinkedList();
+ private final AtomicBoolean launchedRemoteProcess = new AtomicBoolean();
+
+ /**
+ * Launch the {@link org.apache.hadoop.hive.ql.RemoteProcessLauncher} if it hasn't been
+ * launched yet.
+ */
+ public void launchRemoteProcess() throws IOException, HiveException {
+ if (!launchedRemoteProcess.get()) {
+ synchronized (launchedRemoteProcess) {
+ if (!launchedRemoteProcess.get()) {
+ LOG.debug("Creating remote process launcher");
+
+ getPerfLogger().PerfLogBegin("RemoteProcessLauncher", "launch");
+ RemoteProcessLauncherFactory.getRemoteProcessLauncher(sessionConf).launch();
+ getPerfLogger().PerfLogBegin("RemoteProcessLauncher", "launch");
+
+ LOG.debug("Remote process launcher successfully launched");
+ Preconditions.checkState(launchedRemoteProcess.compareAndSet(false, true));
+ }
+ }
+ }
+ }
+
public HiveConf getConf() {
return sessionConf;
}
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 9a07fa1760..4dd0e35cda 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -157,7 +157,7 @@ private void setupSessionIO(SessionState sessionState) {
public void prepare(QueryState queryState) throws HiveSQLException {
setState(OperationState.RUNNING);
try {
- driver = DriverFactory.newDriver(queryState, getParentSession().getUserName(), queryInfo);
+ driver = DriverFactory.newRemoteProcessDriver(queryState, getParentSession().getUserName(), queryInfo);
// Start the timer thread for cancelling the query when query timeout is reached
// queryTimeout == 0 means no timeout
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java
index ed9222cfec..27d6ce45cb 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java
@@ -41,13 +41,13 @@
import java.io.Writer;
import java.net.URI;
import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -77,7 +77,7 @@
* It uses this protocol to submit {@link Job}s to the {@link RemoteDriver}.
*
*/
-abstract class AbstractSparkClient implements SparkClient {
+public abstract class AbstractSparkClient implements SparkClient {
private static final long serialVersionUID = 1L;
@@ -228,6 +228,11 @@ public void cancel(String jobId) {
protocol.cancel(jobId);
}
+ @Override
+ public ClientProtocol getClientProtocol() {
+ return this.protocol;
+ }
+
private Future startDriver(final RpcServer rpcServer, final String clientId,
final String secret) throws IOException {
final String serverAddress = rpcServer.getAddress();
@@ -306,6 +311,21 @@ public void cancel(String jobId) {
}
}
+ String testTmpDir = System.getProperty("test.tmp.dir");
+ if (testTmpDir != null) {
+ addDriverSystemProperty("test.tmp.dir", testTmpDir);
+ }
+
+ String testTmpDirUri = System.getProperty("test.tmp.dir.uri");
+ if (testTmpDirUri != null) {
+ addDriverSystemProperty("test.tmp.dir.uri", testTmpDirUri);
+ }
+
+ String testSrcTables = System.getProperty("test.src.tables");
+ if (testSrcTables != null) {
+ addDriverSystemProperty("test.src.tables", testSrcTables);
+ }
+
Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8);
try {
allProps.store(writer, "Spark Context configuration");
@@ -423,7 +443,15 @@ protected abstract void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyT
protected abstract void addExecutorCores(String executorCores);
- private class ClientProtocol extends BaseProtocol {
+ protected abstract void addDriverSystemProperty(String key, String value);
+
+ public class ClientProtocol extends BaseProtocol {
+
+ private final Map msgResponses;
+
+ public ClientProtocol() {
+ this.msgResponses = Maps.newConcurrentMap();
+ }
JobHandleImpl submit(Job job, List> listeners) {
final String jobId = UUID.randomUUID().toString();
@@ -525,6 +553,116 @@ private void handle(ChannelHandlerContext ctx, JobSubmitted msg) {
}
}
+ // We define the protocol for the RemoteProcessDriver in the same class because the underlying
+ // RPC implementation only supports specifying a single RpcDispatcher and it doesn't support
+ // polymorphism
+
+ public void startSession(byte[] hiveConfBytes) {
+ LOG.debug("Sending startSession request");
+ driverRpc.call(new StartSession(hiveConfBytes));
+ }
+
+ public Exception run(String command, byte[] hiveConfBytes, String queryId) {
+ LOG.debug("Sending run command request for query id " + queryId);
+ return sendMessage(queryId, new RunCommand(command, hiveConfBytes, queryId));
+ }
+
+ public boolean getResults(String queryId, List res) {
+ LOG.debug("Sending get results request for query id " + queryId);
+ CommandResults commandResults = sendMessage(queryId, new GetResults(queryId));
+ res.addAll(commandResults.res);
+ return commandResults.moreResults;
+ }
+
+ public Exception compileAndRespond(String queryId, String command, byte[] hiveConfBytes) {
+ LOG.debug("Sending run command request for query id " + queryId);
+ return sendMessage(queryId, new CompileCommand(command, hiveConfBytes, queryId));
+ }
+
+ public Exception run(String queryId) {
+ LOG.debug("Sending run command request for query id " + queryId);
+ return sendMessage(queryId, new RunCommand(null, null, queryId));
+ }
+
+ public boolean hasResultSet(String queryId) {
+ LOG.debug("Sending hasResultSet request for queryId " + queryId);
+ return sendMessage(queryId, new HasResultSet(queryId));
+ }
+
+ public byte[] getSchema(String queryId) {
+ LOG.debug("Sending getSchema request for queryId " + queryId);
+ return sendMessage(queryId, new GetSchema(queryId));
+ }
+
+ public boolean isFetchingTable(String queryId) {
+ LOG.debug("Sending isFetchingTable request for queryId " + queryId);
+ return sendMessage(queryId, new IsFetchingTable(queryId));
+ }
+
+ public void closeDriver(String queryId) {
+ LOG.debug("Sending closeDriver request for queryId " + queryId);
+ driverRpc.call(new CloseDriverRequest(queryId));
+ }
+
+ public void destroyDriver(String queryId) {
+ LOG.debug("Sending destroyDriver request for queryId " + queryId);
+ driverRpc.call(new DestroyDriverRequest(queryId));
+ }
+
+ public byte[] getQueryDisplay(String queryId) {
+ LOG.debug("Sending getQueryDisplay request for queryId " + queryId);
+ return sendMessage(queryId, new GetQueryDisplayRequest(queryId));
+ }
+
+ /**
+ * Sends a message to the {@link RemoteDriver} via the Driver
+ * {@link org.apache.hive.spark.client.rpc.Rpc} and blocks until the results are received by
+ * the corresponding "handle" method.
+ */
+ private T sendMessage(String queryId, Object msg) {
+ BlockingQueue msgResponse = new ArrayBlockingQueue<>(1);
+ Preconditions.checkState(msgResponses.putIfAbsent(queryId, msgResponse) == null);
+ driverRpc.call(msg);
+ T response;
+ try {
+ response = msgResponse.take();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ msgResponses.remove(queryId);
+ return response;
+ }
+
+ private void handle(ChannelHandlerContext ctx, CommandResults msg) {
+ LOG.debug("Received command results response for query id " + msg.queryId);
+ handleMessageResponse(msg.queryId, msg);
+ }
+
+ private void handle(ChannelHandlerContext ctx, CommandProcessorResponseMessage msg) {
+ LOG.debug("Received command processor response for query id " + msg.queryId);
+ handleMessageResponse(msg.queryId, msg.commandProcessorResponse);
+ }
+
+ private void handle(ChannelHandlerContext ctx, HasResultSetResponse msg) {
+ LOG.debug("Received has result set response for query id " + msg.queryId);
+ handleMessageResponse(msg.queryId, msg.hasResultSet);
+ }
+
+ private void handle(ChannelHandlerContext ctx, GetSchemaResponse msg) {
+ LOG.debug("Received has getSchema response for query id " + msg.queryId);
+ handleMessageResponse(msg.queryId, msg.schema);
+ }
+
+ private void handle(ChannelHandlerContext ctx, IsFetchingTableResponse msg) {
+ LOG.debug("Received has isFetchingTable response for query id " + msg.queryId);
+ handleMessageResponse(msg.queryId, msg.isFetchingTableResponse);
+ }
+
+ private void handleMessageResponse(String queryId, Object response) {
+ Preconditions.checkState(msgResponses.get(queryId).remainingCapacity() == 1);
+ msgResponses.get(queryId).add(response);
+ }
+
@Override
protected String name() {
return "HiveServer2 to Remote Spark Driver Connection";
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
index 558ed80ee8..d7e85c614f 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
@@ -18,6 +18,7 @@
package org.apache.hive.spark.client;
import java.io.Serializable;
+import java.util.List;
import org.apache.hive.spark.client.metrics.Metrics;
import org.apache.hive.spark.client.rpc.RpcDispatcher;
@@ -222,4 +223,170 @@ public String toString() {
'}';
}
}
+
+ protected static class RunCommand implements Serializable {
+
+ final String command;
+ final byte[] hiveConfBytes;
+ final String queryId;
+
+ RunCommand(String command, byte[] hiveConfBytes, String queryId) {
+ this.command = command;
+ this.hiveConfBytes = hiveConfBytes;
+ this.queryId = queryId;
+ }
+ }
+
+ protected static class StartSession implements Serializable {
+
+ final byte[] hiveConfBytes;
+
+ StartSession(byte[] hiveConfBytes) {
+ this.hiveConfBytes = hiveConfBytes;
+ }
+ }
+
+ protected static class HasResultSet implements Serializable {
+
+ final String queryId;
+
+ HasResultSet(String queryId) {
+ this.queryId = queryId;
+ }
+ }
+
+ protected static class GetSchema implements Serializable {
+
+ final String queryId;
+
+ GetSchema(String queryId) {
+ this.queryId = queryId;
+ }
+ }
+
+ protected static class IsFetchingTable implements Serializable {
+
+ final String queryId;
+
+ IsFetchingTable(String queryId) {
+ this.queryId = queryId;
+ }
+ }
+
+ protected static class CloseDriverRequest implements Serializable {
+
+ final String queryId;
+
+ CloseDriverRequest(String queryId) {
+ this.queryId = queryId;
+ }
+ }
+
+ protected static class DestroyDriverRequest implements Serializable {
+
+ final String queryId;
+
+ DestroyDriverRequest(String queryId) {
+ this.queryId = queryId;
+ }
+ }
+
+ protected static class GetQueryDisplayRequest implements Serializable {
+
+ final String queryId;
+
+ GetQueryDisplayRequest(String queryId) {
+ this.queryId = queryId;
+ }
+ }
+
+ protected static class GetSchemaResponse implements Serializable {
+
+ final String queryId;
+ final byte[] schema;
+
+ GetSchemaResponse(String queryId, byte[] schema) {
+ this.queryId = queryId;
+ this.schema = schema;
+ }
+ }
+
+ protected static class HasResultSetResponse implements Serializable {
+
+ final String queryId;
+ final boolean hasResultSet;
+
+ HasResultSetResponse(String queryId, boolean hasResultSet) {
+ this.queryId = queryId;
+ this.hasResultSet = hasResultSet;
+ }
+ }
+
+ protected static class IsFetchingTableResponse implements Serializable {
+
+ final String queryId;
+ final boolean isFetchingTableResponse;
+
+ IsFetchingTableResponse(String queryId, boolean isFetchingTableResponse) {
+ this.queryId = queryId;
+ this.isFetchingTableResponse = isFetchingTableResponse;
+ }
+ }
+
+ protected static class QueryDisplayResponse implements Serializable {
+
+ final String queryId;
+ final byte[] res;
+
+ QueryDisplayResponse(String queryId, byte[] res) {
+ this.queryId = queryId;
+ this.res = res;
+ }
+ }
+
+ protected static class CompileCommand implements Serializable {
+
+ final String command;
+ final byte[] hiveConfBytes;
+ final String queryId;
+
+ CompileCommand(String command, byte[] hiveConfBytes, String queryId) {
+ this.command = command;
+ this.hiveConfBytes = hiveConfBytes;
+ this.queryId = queryId;
+ }
+ }
+
+ protected static class GetResults implements Serializable {
+
+ final String queryId;
+
+ GetResults(String queryId) {
+ this.queryId = queryId;
+ }
+ }
+
+ protected static class CommandResults implements Serializable {
+
+ final List res;
+ final String queryId;
+ final boolean moreResults;
+
+ CommandResults(List res, String queryId, boolean moreResults) {
+ this.res = res;
+ this.queryId = queryId;
+ this.moreResults = moreResults;
+ }
+ }
+
+ protected static class CommandProcessorResponseMessage implements Serializable {
+
+ final String queryId;
+ final Exception commandProcessorResponse;
+
+ CommandProcessorResponseMessage(String queryId, Exception commandProcessorResponse) {
+ this.queryId = queryId;
+ this.commandProcessorResponse = commandProcessorResponse;
+ }
+ }
}
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
index 8130860f2b..d3ce0f5359 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
@@ -25,6 +25,8 @@
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -71,6 +73,7 @@
public class RemoteDriver {
private static final Logger LOG = LoggerFactory.getLogger(RemoteDriver.class);
+ private static RemoteDriver instance;
private final Map> activeJobs;
private final Object jcLock;
@@ -83,7 +86,7 @@
private final File localTmpDir;
// Used to queue up requests while the SparkContext is being created.
- private final List> jobQueue = Lists.newLinkedList();
+ private final List jobQueue = Lists.newLinkedList();
// jc is effectively final, but it has to be volatile since it's accessed by different
// threads while the constructor is running.
@@ -185,12 +188,16 @@ public String toString() {
}
synchronized (jcLock) {
- for (Iterator> it = jobQueue.iterator(); it.hasNext();) {
+ for (Iterator it = jobQueue.iterator(); it.hasNext();) {
it.next().submit();
}
}
}
+ public static RemoteDriver getInstance() {
+ return instance;
+ }
+
private void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (running) {
@@ -216,7 +223,7 @@ private void run() throws InterruptedException {
}
}
- private void submit(JobWrapper> job) {
+ private void submit(Submittable job) {
synchronized (jcLock) {
if (jc != null) {
job.submit();
@@ -275,8 +282,16 @@ private String getArg(String[] args, int keyIdx) {
return args[valIdx];
}
+ public JavaSparkContext sc() {
+ return this.jc.sc();
+ }
+
private class DriverProtocol extends BaseProtocol {
+ private RemoteProcessDriverExecutorFactory remoteProcessDriverExecutorFactory;
+ private final Map commands = Maps.newConcurrentMap();
+ private final ExecutorService es = Executors.newSingleThreadExecutor();
+
Future sendError(Throwable error) {
LOG.debug("Send error to Client: {}", Throwables.getStackTraceAsString(error));
return clientRpc.call(new Error(Throwables.getStackTraceAsString(error)));
@@ -318,6 +333,7 @@ private void handle(ChannelHandlerContext ctx, CancelJob msg) {
private void handle(ChannelHandlerContext ctx, EndSession msg) {
LOG.debug("Shutting down due to EndSession request.");
shutdown(null);
+ es.shutdownNow();
}
private void handle(ChannelHandlerContext ctx, JobRequest msg) {
@@ -356,13 +372,160 @@ public void call(JavaFutureAction> future,
}
}
+ // We define the protocol for the RemoteProcessDriver in the same class because the underlying
+ // RPC implementation only supports specifying a single RpcDispatcher and it doesn't support
+ // polymorphism
+
+ private void handle(ChannelHandlerContext ctx, RunCommand msg) {
+ LOG.debug("Received client run command request for query id " + msg.queryId);
+
+ if (msg.command != null) {
+ submit(() -> {
+ es.submit(() -> {
+ RemoteProcessDriverExecutor remoteProcessDriverExecutor = remoteProcessDriverExecutorFactory.createRemoteProcessDriverExecutor(
+ msg.hiveConfBytes);
+ commands.put(msg.queryId, remoteProcessDriverExecutor);
+ Exception commandProcessorResponse = remoteProcessDriverExecutor.run(msg.command);
+ clientRpc.call(new CommandProcessorResponseMessage(msg
+ .queryId, commandProcessorResponse));
+ });
+ });
+ } else {
+ submit(() -> {
+ es.submit(() -> {
+ Exception res = commands.get(msg.queryId).run();
+ clientRpc.call(new CommandProcessorResponseMessage(msg
+ .queryId, res));
+ });
+ });
+ }
+ }
+
+ private void handle(ChannelHandlerContext ctx, CompileCommand msg) {
+ LOG.debug("Received client compile command request");
+
+ submit(() -> {
+ es.submit(() -> {
+ RemoteProcessDriverExecutor remoteProcessDriverExecutor = remoteProcessDriverExecutorFactory.createRemoteProcessDriverExecutor(
+ msg.hiveConfBytes);
+ commands.put(msg.queryId, remoteProcessDriverExecutor);
+ Exception commandProcessorResponse = remoteProcessDriverExecutor.compileAndRespond(msg
+ .command);
+ clientRpc.call(new CommandProcessorResponseMessage(msg
+ .queryId, commandProcessorResponse));
+ });
+ });
+ }
+
+ private void handle(ChannelHandlerContext ctx, GetResults msg) {
+ LOG.debug("Received client get results request");
+
+ submit(() -> {
+ es.submit(() -> {
+ List res = new ArrayList();
+ try {
+ boolean moreResults = commands.get(msg.queryId).getResults(res);
+ clientRpc.call(new CommandResults(res, msg.queryId, moreResults));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ });
+ }
+
+ private void handle(ChannelHandlerContext ctx, HasResultSet msg) {
+ LOG.debug("Received has result set request for query id " + msg.queryId);
+
+ submit(() -> {
+ es.submit(() -> {
+ boolean res = commands.get(msg.queryId).hasResultSet();
+ clientRpc.call(new HasResultSetResponse(msg.queryId, res));
+ });
+ });
+ }
+
+ private void handle(ChannelHandlerContext ctx, GetSchema msg) {
+ LOG.debug("Received has get schema request for query id " + msg.queryId);
+
+ submit(() -> {
+ es.submit(() -> {
+ byte[] res = commands.get(msg.queryId).getSchema();
+ clientRpc.call(new GetSchemaResponse(msg.queryId, res));
+ });
+ });
+ }
+
+ private void handle(ChannelHandlerContext ctx, IsFetchingTable msg) {
+ LOG.debug("Received is fetching table request for query id " + msg.queryId);
+
+ submit(() -> {
+ es.submit(() -> {
+ boolean res = commands.get(msg.queryId).isFetchingTable();
+ clientRpc.call(new IsFetchingTableResponse(msg.queryId, res));
+ });
+ });
+ }
+
+ private void handle(ChannelHandlerContext ctx, CloseDriverRequest msg) {
+ LOG.debug("Received has close driver request for query id " + msg.queryId);
+
+ submit(() -> {
+ es.submit(() -> {
+ commands.get(msg.queryId).close();
+ });
+ });
+ }
+
+ private void handle(ChannelHandlerContext ctx, GetQueryDisplayRequest msg) {
+ LOG.debug("Received has getQueryDisplay request for query id " + msg.queryId);
+
+ submit(() -> {
+ es.submit(() -> {
+ byte[] res = commands.get(msg.queryId).getQueryDisplay();
+ clientRpc.call(new QueryDisplayResponse(msg.queryId, res));
+ });
+ });
+ }
+
+ private void handle(ChannelHandlerContext ctx, DestroyDriverRequest msg) {
+ LOG.debug("Received has destroy driver request for query id " + msg.queryId);
+
+ submit(() -> {
+ es.submit(() -> {
+ commands.get(msg.queryId).destroy();
+ });
+ });
+ }
+
+ private void handle(ChannelHandlerContext ctx, StartSession msg) {
+ LOG.debug("Received start session request");
+
+ submit(() -> {
+ es.submit(() -> {
+ this.remoteProcessDriverExecutorFactory = createRemoteProcessDriverExecutorFactory(
+ msg.hiveConfBytes);
+ });
+ });
+ }
+
+ private RemoteProcessDriverExecutorFactory createRemoteProcessDriverExecutorFactory(byte[] hiveConfBytes) {
+ try {
+ return (RemoteProcessDriverExecutorFactory) Class.forName("org.apache.hadoop.hive.ql.exec" +
+ ".spark.RemoteProcessDriverExecutorFactoryImpl").getConstructor(byte[].class)
+ .newInstance(hiveConfBytes);
+ } catch (ClassNotFoundException | IllegalAccessException | InstantiationException |
+ NoSuchMethodException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public String name() {
return "Remote Spark Driver to HiveServer2 Connection";
}
}
- private class JobWrapper implements Callable {
+ private class JobWrapper implements Callable, Submittable {
private final BaseProtocol.JobRequest req;
private final List> jobs;
@@ -443,7 +606,8 @@ public void call(JavaFutureAction> future,
return null;
}
- void submit() {
+ @Override
+ public void submit() {
this.future = executor.submit(this);
}
@@ -557,6 +721,7 @@ private String getClientId(Integer jobId) {
public static void main(String[] args) throws Exception {
RemoteDriver rd = new RemoteDriver(args);
+ RemoteDriver.instance = rd;
try {
rd.run();
} catch (Exception e) {
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteProcessDriverExecutor.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteProcessDriverExecutor.java
new file mode 100644
index 0000000000..6fa38937b5
--- /dev/null
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteProcessDriverExecutor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Executes driver methods inside a remote process.
+ */
+public interface RemoteProcessDriverExecutor {
+
+ Exception run(String command);
+
+ boolean getResults(List res) throws IOException;
+
+ Exception run();
+
+ Exception compileAndRespond(String command);
+
+ boolean hasResultSet();
+
+ byte[] getSchema();
+
+ boolean isFetchingTable();
+
+ void close();
+
+ void destroy();
+
+ byte[] getQueryDisplay();
+}
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteProcessDriverExecutorFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteProcessDriverExecutorFactory.java
new file mode 100644
index 0000000000..0e512da6d2
--- /dev/null
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteProcessDriverExecutorFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+/**
+ * Creates a new {@link RemoteProcessDriverExecutor} for a given
+ * {@link org.apache.hadoop.hive.conf.HiveConf}.
+ */
+public interface RemoteProcessDriverExecutorFactory {
+
+ RemoteProcessDriverExecutor createRemoteProcessDriverExecutor(byte[] hiveConfBytes);
+}
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
index 913889951e..2b66744fde 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
@@ -23,6 +23,7 @@
import java.util.concurrent.Future;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hive.spark.client.rpc.Rpc;
/**
* Defines the API for the Spark remote client.
@@ -117,4 +118,11 @@
* @param jobId the jobId to cancel
*/
void cancel(String jobId);
+
+ /**
+ * Returns the client protocol associated with this client. The client protocol is an extension
+ * of {@link org.apache.hive.spark.client.rpc.RpcDispatcher} and defines the client side
+ * protocol for interacting with the {@link RemoteDriver}.
+ */
+ AbstractSparkClient.ClientProtocol getClientProtocol();
}
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java
index d45b77f0b8..1c123514c6 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java
@@ -177,6 +177,11 @@ protected void addExecutorCores(String executorCores) {
getSparkLauncher().addSparkArg("--executor-cores", executorCores);
}
+ @Override
+ protected void addDriverSystemProperty(String key, String value) {
+ // TODO
+ }
+
private AbstractLauncher getSparkLauncher() {
if (this.sparkLauncher == null) {
this.sparkLauncher = new InProcessLauncher();
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
index 31e89b8fa0..3c8555eccb 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
@@ -155,6 +155,12 @@ protected void addExecutorCores(String executorCores) {
argv.add(executorCores);
}
+ @Override
+ protected void addDriverSystemProperty(String key, String value) {
+ argv.add("--driver-java-options");
+ argv.add("\"-D" + key + "=" + value + "\"");
+ }
+
private String getSparkJobCredentialProviderPassword() {
if (conf.containsKey("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD")) {
return conf.get("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD");
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/Submittable.java b/spark-client/src/main/java/org/apache/hive/spark/client/Submittable.java
new file mode 100644
index 0000000000..e3ae6ce16e
--- /dev/null
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/Submittable.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+/**
+ * Defines a function that is "submittable" to some external runner, such as an
+ * {@link java.util.concurrent.ExecutorService}.
+ */
+public interface Submittable {
+
+ void submit();
+}