commit 0bd04fed5bbcb47dfed0f796bd65598391b16885 Author: Sahil Takiar Date: Thu Jan 25 17:36:30 2018 -0800 HIVE-18533: Add option to use InProcessLauncher to submit spark jobs diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index aedd1ec975..96749224e1 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3607,6 +3607,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "If a Spark job contains more tasks than the maximum, it will be cancelled. A value of -1 means no limit."), SPARK_STAGE_MAX_TASKS("hive.spark.stage.max.tasks", -1, "The maximum number of tasks a stage in a Spark job may have.\n" + "If a Spark job stage contains more tasks than the maximum, the job will be cancelled. A value of -1 means no limit."), + SPARK_CLIENT_TYPE("hive.spark.client.type", SparkClientType.SPARK_SUBMIT_CLIENT.toString(), + "Controls how the Spark application is launched. If " + SparkClientType.SPARK_SUBMIT_CLIENT + + " is specified (default) then the spark-submit shell script is used to launch the Spark " + + "app. If " + SparkClientType.SPARK_LAUNCHER_CLIENT + " is specified then Spark's " + + "SparkLauncher is used to programmatically launch the app."), NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true, @@ -5118,4 +5123,8 @@ public void verifyAndSetAll(Map overlay) { return ret; } + public enum SparkClientType { + SPARK_SUBMIT_CLIENT, + SPARK_LAUNCHER_CLIENT + } } diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml index 6c633623ab..1bb3c7d1f7 100644 --- a/data/conf/spark/yarn-client/hive-site.xml +++ b/data/conf/spark/yarn-client/hive-site.xml @@ -26,6 +26,11 @@ + + hive.spark.client.type + SPARK_LAUNCHER_CLIENT + + hadoop.tmp.dir ${test.tmp.dir}/hadoop-tmp diff --git a/itests/qtest-spark/pom.xml b/itests/qtest-spark/pom.xml index 72b13a1e63..fb51963019 100644 --- a/itests/qtest-spark/pom.xml +++ b/itests/qtest-spark/pom.xml @@ -63,6 +63,12 @@ + + org.apache.spark + spark-yarn_${scala.binary.version} + ${spark.version} + test + org.eclipse.jetty jetty-util diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java similarity index 72% rename from spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java rename to spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java index 665ed92898..7f887458a1 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java @@ -25,7 +25,6 @@ import com.google.common.base.Strings; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Resources; @@ -38,24 +37,21 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; -import java.io.PrintStream; import java.io.Serializable; import java.io.Writer; import java.net.URI; import java.net.URL; -import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hive.common.log.LogRedirector; -import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.Utils; @@ -68,10 +64,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class SparkClientImpl implements SparkClient { +abstract class AbstractSparkClient implements SparkClient { private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractSparkClient.class); private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds @@ -83,22 +79,22 @@ private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath"; private static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath"; - private final Map conf; + protected final Map conf; private final HiveConf hiveConf; - private final Thread driverThread; + private final Future driverFuture; private final Map> jobs; private final Rpc driverRpc; private final ClientProtocol protocol; - private volatile boolean isAlive; + protected volatile boolean isAlive; - SparkClientImpl(RpcServer rpcServer, Map conf, HiveConf hiveConf, + protected AbstractSparkClient(RpcServer rpcServer, Map conf, HiveConf hiveConf, String sessionid) throws IOException { this.conf = conf; this.hiveConf = hiveConf; this.jobs = Maps.newConcurrentMap(); String secret = rpcServer.createSecret(); - this.driverThread = startDriver(rpcServer, sessionid, secret); + this.driverFuture = startDriver(rpcServer, sessionid, secret); this.protocol = new ClientProtocol(); try { @@ -118,12 +114,14 @@ errorMsg = "Error while waiting for client to connect."; } LOG.error(errorMsg, e); - driverThread.interrupt(); + driverFuture.cancel(true); try { - driverThread.join(); + driverFuture.get(); } catch (InterruptedException ie) { // Give up. LOG.warn("Interrupted before driver thread was finished.", ie); + } catch (ExecutionException ee) { + LOG.error("Driver thread failed", ee); } throw Throwables.propagate(e); } @@ -168,15 +166,16 @@ public void stop() { } } - long endTime = System.currentTimeMillis() + DEFAULT_SHUTDOWN_TIMEOUT; try { - driverThread.join(DEFAULT_SHUTDOWN_TIMEOUT); + driverFuture.get(DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + LOG.error("Exception while waiting for driver future to complete", e); + } catch (TimeoutException e) { + LOG.warn("Timed out shutting down remote driver, cancelling..."); + driverFuture.cancel(true); } catch (InterruptedException ie) { LOG.debug("Interrupted before driver thread was finished."); - } - if (endTime - System.currentTimeMillis() <= 0) { - LOG.warn("Timed out shutting down remote driver, interrupting..."); - driverThread.interrupt(); + driverFuture.cancel(true); } } @@ -205,13 +204,13 @@ public boolean isActive() { return isAlive && driverRpc.isActive(); } - void cancel(String jobId) { + @Override + public void cancel(String jobId) { protocol.cancel(jobId); } - private Thread startDriver(final RpcServer rpcServer, final String clientId, final String secret) + private Future startDriver(final RpcServer rpcServer, final String clientId, final String secret) throws IOException { - Runnable runnable; final String serverAddress = rpcServer.getAddress(); final String serverPort = String.valueOf(rpcServer.getPort()); @@ -303,6 +302,8 @@ private Thread startDriver(final RpcServer rpcServer, final String clientId, fin writer.close(); } + addSparkHome(sparkHome); + // Define how to pass options to the child process. If launching in client (or local) // mode, the driver options need to be passed directly on the command line. Otherwise, // SparkSubmit will take care of that for us. @@ -310,64 +311,20 @@ private Thread startDriver(final RpcServer rpcServer, final String clientId, fin Preconditions.checkArgument(master != null, "spark.master is not defined."); String deployMode = conf.get("spark.submit.deployMode"); - List argv = Lists.newLinkedList(); - - if (sparkHome != null) { - argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath()); - } else { - LOG.info("No spark.home provided, calling SparkSubmit directly."); - argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath()); - - if (master.startsWith("local") || master.startsWith("mesos") || - SparkClientUtilities.isYarnClientMode(master, deployMode) || - master.startsWith("spark")) { - String mem = conf.get("spark.driver.memory"); - if (mem != null) { - argv.add("-Xms" + mem); - argv.add("-Xmx" + mem); - } - - String cp = conf.get("spark.driver.extraClassPath"); - if (cp != null) { - argv.add("-classpath"); - argv.add(cp); - } - - String libPath = conf.get("spark.driver.extraLibPath"); - if (libPath != null) { - argv.add("-Djava.library.path=" + libPath); - } - - String extra = conf.get(DRIVER_OPTS_KEY); - if (extra != null) { - for (String opt : extra.split("[ ]")) { - if (!opt.trim().isEmpty()) { - argv.add(opt.trim()); - } - } - } - } - - argv.add("org.apache.spark.deploy.SparkSubmit"); - } - if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) { String executorCores = conf.get("spark.executor.cores"); if (executorCores != null) { - argv.add("--executor-cores"); - argv.add(executorCores); + addExecutorCores(executorCores); } String executorMemory = conf.get("spark.executor.memory"); if (executorMemory != null) { - argv.add("--executor-memory"); - argv.add(executorMemory); + addExecutorMemory(executorMemory); } String numOfExecutors = conf.get("spark.executor.instances"); if (numOfExecutors != null) { - argv.add("--num-executors"); - argv.add(numOfExecutors); + addNumExecutors(numOfExecutors); } } // The options --principal/--keypad do not work with --proxy-user in spark-submit.sh @@ -380,24 +337,9 @@ private Thread startDriver(final RpcServer rpcServer, final String clientId, fin String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL), "0.0.0.0"); String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); + boolean isDoAsEnabled = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); if (StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(keyTabFile)) { - if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { - List kinitArgv = Lists.newLinkedList(); - kinitArgv.add("kinit"); - kinitArgv.add(principal); - kinitArgv.add("-k"); - kinitArgv.add("-t"); - kinitArgv.add(keyTabFile + ";"); - kinitArgv.addAll(argv); - argv = kinitArgv; - } else { - // if doAs is not enabled, we pass the principal/keypad to spark-submit in order to - // support the possible delegation token renewal in Spark - argv.add("--principal"); - argv.add(principal); - argv.add("--keytab"); - argv.add(keyTabFile); - } + addKeytabAndPrincipal(isDoAsEnabled, keyTabFile, principal); } } if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { @@ -406,8 +348,7 @@ private Thread startDriver(final RpcServer rpcServer, final String clientId, fin // do not do impersonation in CLI mode if (!currentUser.equals(System.getProperty("user.name"))) { LOG.info("Attempting impersonation of " + currentUser); - argv.add("--proxy-user"); - argv.add(currentUser); + addProxyUser(currentUser); } } catch (Exception e) { String msg = "Cannot obtain username: " + e; @@ -417,108 +358,60 @@ private Thread startDriver(final RpcServer rpcServer, final String clientId, fin String regStr = conf.get("spark.kryo.registrator"); if (HIVE_KRYO_REG_NAME.equals(regStr)) { - argv.add("--jars"); - argv.add(SparkClientUtilities.findKryoRegistratorJar(hiveConf)); + addJars(SparkClientUtilities.findKryoRegistratorJar(hiveConf)); } - argv.add("--properties-file"); - argv.add(properties.getAbsolutePath()); - argv.add("--class"); - argv.add(RemoteDriver.class.getName()); + addPropertiesFile(properties.getAbsolutePath()); + addClass(RemoteDriver.class.getName()); String jar = "spark-internal"; if (SparkContext.jarOfClass(this.getClass()).isDefined()) { jar = SparkContext.jarOfClass(this.getClass()).get(); } - argv.add(jar); + addExecutableJar(jar); - argv.add("--remote-host"); - argv.add(serverAddress); - argv.add("--remote-port"); - argv.add(serverPort); + addRemoteHost(serverAddress); + addRemotePort(serverPort); //hive.spark.* keys are passed down to the RemoteDriver via --conf, //as --properties-file contains the spark.* keys that are meant for SparkConf object. for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) { String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey); - argv.add("--conf"); - argv.add(String.format("%s=%s", hiveSparkConfKey, value)); + addAppArg(hiveSparkConfKey, value); } - String cmd = Joiner.on(" ").join(argv); - LOG.info("Running client driver with argv: {}", cmd); - ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd); + return launchDriver(isTesting, rpcServer, clientId); + } - // Prevent hive configurations from being visible in Spark. - pb.environment().remove("HIVE_HOME"); - pb.environment().remove("HIVE_CONF_DIR"); - // Add credential provider password to the child process's environment - // In case of Spark the credential provider location is provided in the jobConf when the job is submitted - String password = getSparkJobCredentialProviderPassword(); - if(password != null) { - pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password); - } - if (isTesting != null) { - pb.environment().put("SPARK_TESTING", isTesting); - } + protected abstract Future launchDriver(String isTesting, RpcServer rpcServer, String + clientId) throws IOException; - final Process child = pb.start(); - String threadName = Thread.currentThread().getName(); - final List childErrorLog = Collections.synchronizedList(new ArrayList()); - final LogRedirector.LogSourceCallback callback = () -> {return isAlive;}; + protected abstract void addSparkHome(String sparkHome); - LogRedirector.redirect("RemoteDriver-stdout-redir-" + threadName, - new LogRedirector(child.getInputStream(), LOG, callback)); - LogRedirector.redirect("RemoteDriver-stderr-redir-" + threadName, - new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback)); + protected abstract void addAppArg(String key, String value); - runnable = new Runnable() { - @Override - public void run() { - try { - int exitCode = child.waitFor(); - if (exitCode != 0) { - StringBuilder errStr = new StringBuilder(); - synchronized(childErrorLog) { - Iterator iter = childErrorLog.iterator(); - while(iter.hasNext()){ - errStr.append(iter.next()); - errStr.append('\n'); - } - } - - LOG.warn("Child process exited with code {}", exitCode); - rpcServer.cancelClient(clientId, - "Child process (spark-submit) exited before connecting back with error log " + errStr.toString()); - } - } catch (InterruptedException ie) { - LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process."); - rpcServer.cancelClient(clientId, "Thread waiting on the child porcess (spark-submit) is interrupted"); - Thread.interrupted(); - child.destroy(); - } catch (Exception e) { - String errMsg = "Exception while waiting for child process (spark-submit)"; - LOG.warn(errMsg, e); - rpcServer.cancelClient(clientId, errMsg); - } - } - }; + protected abstract void addRemotePort(String serverPort); - Thread thread = new Thread(runnable); - thread.setDaemon(true); - thread.setName("Driver"); - thread.start(); - return thread; - } + protected abstract void addRemoteHost(String serverAddress); - private String getSparkJobCredentialProviderPassword() { - if (conf.containsKey("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD")) { - return conf.get("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD"); - } else if (conf.containsKey("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD")) { - return conf.get("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD"); - } - return null; - } + protected abstract void addExecutableJar(String jar); + + protected abstract void addPropertiesFile(String absolutePath); + + protected abstract void addClass(String name); + + protected abstract void addJars(String jars); + + protected abstract void addProxyUser(String proxyUser); + + protected abstract void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyTabFile, + String principal); + + protected abstract void addNumExecutors(String numOfExecutors); + + protected abstract void addExecutorMemory(String executorMemory); + + protected abstract void addExecutorCores(String executorCores); private class ClientProtocol extends BaseProtocol { @@ -526,7 +419,7 @@ private String getSparkJobCredentialProviderPassword() { final String jobId = UUID.randomUUID().toString(); final Promise promise = driverRpc.createPromise(); final JobHandleImpl handle = - new JobHandleImpl(SparkClientImpl.this, promise, jobId, listeners); + new JobHandleImpl(AbstractSparkClient.this, promise, jobId, listeners); jobs.put(jobId, handle); final io.netty.util.concurrent.Future rpc = driverRpc.call(new JobRequest(jobId, job)); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java index 2881252b0e..53e789d9ea 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java @@ -34,7 +34,7 @@ */ class JobHandleImpl implements JobHandle { - private final SparkClientImpl client; + private final SparkClient client; private final String jobId; private final MetricsCollection metrics; private final Promise promise; @@ -43,8 +43,8 @@ private volatile State state; private volatile SparkCounters sparkCounters; - JobHandleImpl(SparkClientImpl client, Promise promise, String jobId, - List> listeners) { + JobHandleImpl(SparkClient client, Promise promise, String jobId, + List> listeners) { this.client = client; this.jobId = jobId; this.promise = promise; @@ -233,7 +233,7 @@ private void fireStateChange(State newState, Listener listener) { } } - /** Last attempt at preventing stray jobs from accumulating in SparkClientImpl. */ + /** Last attempt at preventing stray jobs from accumulating in SparkSubmitSparkClient. */ @Override protected void finalize() { if (!isDone()) { diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index e584cbb0a7..bc8f5aa093 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -71,6 +71,8 @@ public class RemoteDriver { private static final Logger LOG = LoggerFactory.getLogger(RemoteDriver.class); + public static final String REMOTE_PORT_KEY = "--remote-port"; + public static final String REMOTE_HOST_KEY = "--remote-host"; private final Map> activeJobs; private final Object jcLock; @@ -101,9 +103,9 @@ private RemoteDriver(String[] args) throws Exception { int serverPort = -1; for (int idx = 0; idx < args.length; idx += 2) { String key = args[idx]; - if (key.equals("--remote-host")) { + if (key.equals(REMOTE_HOST_KEY)) { serverAddress = getArg(args, idx); - } else if (key.equals("--remote-port")) { + } else if (key.equals(REMOTE_PORT_KEY)) { serverPort = Integer.parseInt(getArg(args, idx)); } else if (key.equals("--client-id")) { conf.set(SparkClientFactory.CONF_CLIENT_ID, getArg(args, idx)); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java index 1922e412a1..f02b4b6029 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java @@ -110,4 +110,6 @@ * Check if remote context is still active. */ boolean isActive(); + + void cancel(String jobId); } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index fd9b72583a..f596df5eaa 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -82,10 +82,16 @@ public static void stop() { * @param hiveConf Configuration for Hive, contains hive.* properties. */ public static SparkClient createClient(Map sparkConf, HiveConf hiveConf, - String sessionId) - throws IOException, SparkException { + String sessionId) throws IOException { Preconditions.checkState(server != null, "initialize() not called."); - return new SparkClientImpl(server, sparkConf, hiveConf, sessionId); + switch (HiveConf.SparkClientType.valueOf( + hiveConf.getVar(HiveConf.ConfVars.SPARK_CLIENT_TYPE))) { + case SPARK_SUBMIT_CLIENT: + return new SparkSubmitSparkClient(server, sparkConf, hiveConf, sessionId); + case SPARK_LAUNCHER_CLIENT: + return new SparkLauncherSparkClient(server, sparkConf, hiveConf, sessionId); + default: + return new SparkSubmitSparkClient(server, sparkConf, hiveConf, sessionId); + } } - } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java new file mode 100644 index 0000000000..8331dcd150 --- /dev/null +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkLauncherSparkClient.java @@ -0,0 +1,194 @@ +package org.apache.hive.spark.client; + +import com.google.common.collect.Sets; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.spark.client.rpc.RpcServer; + +import org.apache.spark.launcher.AbstractLauncher; +import org.apache.spark.launcher.InProcessLauncher; +import org.apache.spark.launcher.SparkAppHandle; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class SparkLauncherSparkClient extends AbstractSparkClient { + + private AbstractLauncher sparkLauncher; + + SparkLauncherSparkClient(RpcServer rpcServer, + Map conf, + HiveConf hiveConf, + String sessionid) throws IOException { + super(rpcServer, conf, hiveConf, sessionid); + } + + @Override + protected Future launchDriver(String isTesting, RpcServer rpcServer, + String clientId) throws IOException { + if (isTesting != null) { + System.setProperty("spark.testing", "true"); + } + + return new SparkLauncherFuture(getSparkLauncher().startApplication()); + } + + @Override + protected void addSparkHome(String sparkHome) { + // Nothing to do + } + + @Override + protected void addAppArg(String key, String value) { + getSparkLauncher().addAppArgs("--conf", String.format("%s=%s", key, value)); + } + + @Override + protected void addRemotePort(String serverPort) { + getSparkLauncher().addAppArgs(RemoteDriver.REMOTE_PORT_KEY, serverPort); + } + + @Override + protected void addRemoteHost(String serverAddress) { + getSparkLauncher().addAppArgs(RemoteDriver.REMOTE_HOST_KEY, serverAddress); + } + + @Override + protected void addExecutableJar(String jar) { + getSparkLauncher().setAppResource(jar); + } + + @Override + protected void addPropertiesFile(String absolutePath) { + getSparkLauncher().setPropertiesFile(absolutePath); + } + + @Override + protected void addClass(String name) { + getSparkLauncher().setMainClass(name); + } + + @Override + protected void addJars(String jars) { + getSparkLauncher().addJar(jars); + } + + @Override + protected void addProxyUser(String proxyUser) { + getSparkLauncher().addSparkArg("--proxy-user", proxyUser); + } + + @Override + protected void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyTabFile, String principal) { + getSparkLauncher().addSparkArg("--principal", principal); + getSparkLauncher().addSparkArg("--keytab", keyTabFile); + } + + @Override + protected void addNumExecutors(String numOfExecutors) { + getSparkLauncher().addSparkArg("--num-executors", numOfExecutors); + } + + @Override + protected void addExecutorMemory(String executorMemory) { + getSparkLauncher().addSparkArg("--executor-memory", executorMemory); + } + + @Override + protected void addExecutorCores(String executorCores) { + getSparkLauncher().addSparkArg("--executor-cores", executorCores); + } + + private AbstractLauncher getSparkLauncher() { + if (this.sparkLauncher == null) { + this.sparkLauncher = new InProcessLauncher(); + } + return this.sparkLauncher; + } + + private static class SparkLauncherFuture implements Future { + + private static final Set NOT_RUNNING_SPARK_STATES = Sets.newHashSet( + SparkAppHandle.State.FAILED, + SparkAppHandle.State.FINISHED, + SparkAppHandle.State.KILLED, + SparkAppHandle.State.LOST); + + private static final Set RUNNING_SPARK_STATES = Sets.newHashSet( + SparkAppHandle.State.RUNNING, + SparkAppHandle.State.FINISHED); + + private final SparkAppHandle sparkAppHandle; + + private SparkLauncherFuture(SparkAppHandle sparkAppHandle) { + this.sparkAppHandle = sparkAppHandle; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + this.sparkAppHandle.stop(); + return true; + } + + @Override + public boolean isCancelled() { + return NOT_RUNNING_SPARK_STATES.contains(this.sparkAppHandle.getState()); + } + + @Override + public boolean isDone() { + return RUNNING_SPARK_STATES.contains(this.sparkAppHandle.getState()); + } + + @Override + public Void get() throws InterruptedException { + try { + return get(-1, null); + } catch (TimeoutException e) { + // Should never be thrown + } + return null; + } + + @Override + public Void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { + if (RUNNING_SPARK_STATES.contains(this.sparkAppHandle.getState())) { + return null; + } + final CountDownLatch shutdownLatch = new CountDownLatch(1); + this.sparkAppHandle.addListener(new SparkAppHandle.Listener() { + @Override + public void stateChanged(SparkAppHandle sparkAppHandle) { + if (RUNNING_SPARK_STATES.contains(sparkAppHandle.getState())) { + shutdownLatch.countDown(); + } + } + + @Override + public void infoChanged(SparkAppHandle sparkAppHandle) { + // Do nothing + } + }); + if (RUNNING_SPARK_STATES.contains(this.sparkAppHandle.getState())) { + return null; + } else { + if (timeout > 0) { + if (shutdownLatch.await(timeout, unit)) { + return null; + } else { + throw new TimeoutException("Spark Application did not reach running state within " + + "allotted timeout of " + timeout + " " + unit.toString()); + } + } else { + shutdownLatch.await(); + } + } + return null; + } + } +} diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java new file mode 100644 index 0000000000..fcc1e5ab0a --- /dev/null +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.common.log.LogRedirector; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.spark.client.rpc.RpcServer; +import org.apache.spark.SparkException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SparkSubmitSparkClient extends AbstractSparkClient { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(SparkSubmitSparkClient.class); + + private static final String DRIVER_OPTS_KEY = "spark.driver.extraJavaOptions"; + + private List argv; + + SparkSubmitSparkClient(RpcServer rpcServer, Map conf, HiveConf hiveConf, + String sessionid) throws IOException { + super(rpcServer, conf, hiveConf, sessionid); + } + + @Override + protected void addSparkHome(String sparkHome) { + // Define how to pass options to the child process. If launching in client (or local) + // mode, the driver options need to be passed directly on the command line. Otherwise, + // SparkSubmit will take care of that for us. + String master = conf.get("spark.master"); + Preconditions.checkArgument(master != null, "spark.master is not defined."); + String deployMode = conf.get("spark.submit.deployMode"); + + argv = Lists.newLinkedList(); + + if (sparkHome != null) { + argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath()); + } else { + LOG.info("No spark.home provided, calling SparkSubmit directly."); + argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath()); + + if (master.startsWith("local") || master.startsWith("mesos") || + SparkClientUtilities.isYarnClientMode(master, deployMode) || + master.startsWith("spark")) { + String mem = conf.get("spark.driver.memory"); + if (mem != null) { + argv.add("-Xms" + mem); + argv.add("-Xmx" + mem); + } + + String cp = conf.get("spark.driver.extraClassPath"); + if (cp != null) { + argv.add("-classpath"); + argv.add(cp); + } + + String libPath = conf.get("spark.driver.extraLibPath"); + if (libPath != null) { + argv.add("-Djava.library.path=" + libPath); + } + + String extra = conf.get(DRIVER_OPTS_KEY); + if (extra != null) { + for (String opt : extra.split("[ ]")) { + if (!opt.trim().isEmpty()) { + argv.add(opt.trim()); + } + } + } + } + + argv.add("org.apache.spark.deploy.SparkSubmit"); + } + } + + @Override + protected void addAppArg(String key, String value) { + argv.add("--conf"); + argv.add(String.format("%s=%s", key, value)); + } + + @Override + protected void addRemotePort(String serverPort) { + argv.add("--remote-port"); + argv.add(serverPort); + } + + @Override + protected void addRemoteHost(String serverAddress) { + argv.add("--remote-host"); + argv.add(serverAddress); + } + + @Override + protected void addExecutableJar(String jar) { + argv.add(jar); + } + + @Override + protected void addPropertiesFile(String absolutePath) { + argv.add("--properties-file"); + argv.add(absolutePath); + } + + @Override + protected void addClass(String name) { + argv.add("--class"); + argv.add(RemoteDriver.class.getName()); + } + + @Override + protected void addJars(String jars) { + argv.add("--jars"); + argv.add(jars); + } + + @Override + protected void addProxyUser(String proxyUser) { + argv.add("--proxy-user"); + argv.add(proxyUser); + } + + @Override + protected void addKeytabAndPrincipal(boolean isDoAsEnabled, String keyTabFile, String principal) { + if (isDoAsEnabled) { + List kinitArgv = Lists.newLinkedList(); + kinitArgv.add("kinit"); + kinitArgv.add(principal); + kinitArgv.add("-k"); + kinitArgv.add("-t"); + kinitArgv.add(keyTabFile + ";"); + kinitArgv.addAll(argv); + argv = kinitArgv; + } else { + // if doAs is not enabled, we pass the principal/keypad to spark-submit in order to + // support the possible delegation token renewal in Spark + argv.add("--principal"); + argv.add(principal); + argv.add("--keytab"); + argv.add(keyTabFile); + } + } + + @Override + protected void addNumExecutors(String numOfExecutors) { + argv.add("--num-executors"); + argv.add(numOfExecutors); + } + + @Override + protected void addExecutorMemory(String executorMemory) { + argv.add("--executor-memory"); + argv.add(executorMemory); + } + + @Override + protected void addExecutorCores(String executorCores) { + argv.add("--executor-cores"); + argv.add(executorCores); + } + + private String getSparkJobCredentialProviderPassword() { + if (conf.containsKey("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD")) { + return conf.get("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD"); + } else if (conf.containsKey("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD")) { + return conf.get("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD"); + } + return null; + } + + @Override + protected Future launchDriver(String isTesting, RpcServer rpcServer, String clientId) throws + IOException { + Callable runnable; + + String cmd = Joiner.on(" ").join(argv); + LOG.info("Running client driver with argv: {}", cmd); + ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd); + + // Prevent hive configurations from being visible in Spark. + pb.environment().remove("HIVE_HOME"); + pb.environment().remove("HIVE_CONF_DIR"); + // Add credential provider password to the child process's environment + // In case of Spark the credential provider location is provided in the jobConf when the job is submitted + String password = getSparkJobCredentialProviderPassword(); + if(password != null) { + pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password); + } + if (isTesting != null) { + pb.environment().put("SPARK_TESTING", isTesting); + } + + final Process child = pb.start(); + String threadName = Thread.currentThread().getName(); + final List childErrorLog = Collections.synchronizedList(new ArrayList()); + final LogRedirector.LogSourceCallback callback = () -> {return isAlive;}; + + LogRedirector.redirect("RemoteDriver-stdout-redir-" + threadName, + new LogRedirector(child.getInputStream(), LOG, callback)); + LogRedirector.redirect("RemoteDriver-stderr-redir-" + threadName, + new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback)); + + runnable = new Callable() { + @Override + public Void call() { + try { + int exitCode = child.waitFor(); + if (exitCode != 0) { + StringBuilder errStr = new StringBuilder(); + synchronized(childErrorLog) { + Iterator iter = childErrorLog.iterator(); + while(iter.hasNext()){ + errStr.append(iter.next()); + errStr.append('\n'); + } + } + + LOG.warn("Child process exited with code {}", exitCode); + rpcServer.cancelClient(clientId, + "Child process (spark-submit) exited before connecting back with error log " + errStr.toString()); + } + } catch (InterruptedException ie) { + LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process."); + rpcServer.cancelClient(clientId, "Thread waiting on the child porcess (spark-submit) is interrupted"); + Thread.interrupted(); + child.destroy(); + } catch (Exception e) { + String errMsg = "Exception while waiting for child process (spark-submit)"; + LOG.warn(errMsg, e); + rpcServer.cancelClient(clientId, errMsg); + } + return null; + } + }; + + ExecutorService es = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon + (true).setNameFormat("Driver").build()); + + return es.submit(runnable); + } +} diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java index d6b627b630..2964df6856 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java @@ -32,7 +32,7 @@ @RunWith(MockitoJUnitRunner.class) public class TestJobHandle { - @Mock private SparkClientImpl client; + @Mock private SparkSubmitSparkClient client; @Mock private Promise promise; @Mock private JobHandle.Listener listener; @Mock private JobHandle.Listener listener2;