diff --git a/common/src/java/org/apache/hadoop/hive/common/ProcessRunner.java b/common/src/java/org/apache/hadoop/hive/common/ProcessRunner.java new file mode 100644 index 0000000..4f41e8e --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/ProcessRunner.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import com.google.common.base.Joiner; + +import org.slf4j.Logger; + + +public class ProcessRunner { + + private static final long MAX_ERR_LOG_LINES = 1000; + private final Logger log; + + public ProcessRunner(Logger log) { + this.log = log; + } + + public void run(List args) throws IOException, InterruptedException { + String cmd = Joiner.on(" ").join(args); + log.info("Running command with argv: {}", cmd); + ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd); + + final Process child = pb.start(); + final List childErrorLog = new ArrayList<>(); + + String uuid = UUID.randomUUID().toString(); + redirect("stdout-redir-" + uuid, new Redirector(child.getInputStream())); + redirect("stderr-redir-" + uuid, new Redirector(child.getErrorStream(), childErrorLog)); + + try { + int exitCode = child.waitFor(); + if (exitCode != 0) { + StringBuilder errStr = new StringBuilder(); + for (String s : childErrorLog) { + errStr.append(s); + errStr.append('\n'); + } + log.warn("Child process exited with error log " + errStr.toString()); + log.warn("Child process exited with code {}", exitCode); + } + } catch (InterruptedException ie) { + log.warn("Waiting thread interrupted, killing child process."); + Thread.interrupted(); + child.destroy(); + } catch (Exception e) { + log.warn("Exception while waiting for child process.", e); + } + } + + private void redirect(String name, Redirector redirector) { + Thread thread = new Thread(redirector); + thread.setName(name); + thread.setDaemon(true); + thread.start(); + } + + private class Redirector implements Runnable { + + private final BufferedReader in; + private List errLogs; + private int numErrLogLines = 0; + + Redirector(InputStream in) { + this.in = new BufferedReader(new InputStreamReader(in)); + } + + Redirector(InputStream in, List errLogs) { + this.in = new BufferedReader(new InputStreamReader(in)); + this.errLogs = errLogs; + } + + @Override + public void run() { + try { + String line = null; + while ((line = in.readLine()) != null) { + log.info(line); + if (errLogs != null) { + if (numErrLogLines++ < MAX_ERR_LOG_LINES) { + errLogs.add(line); + } + } + } + } catch (Exception e) { + log.warn("Error in redirector thread.", e); + } + } + } +} diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index ede8ce9..4abb8da 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -68,6 +68,13 @@ @InterfaceAudience.Private public class RemoteDriver { + static final String REMOTE_HOST_KEY = "--remote-host"; + static final String REMOTE_PORT_KEY = "--remote-port"; + static final String CONF_KEY = "--conf"; + + private static final String CLIENT_ID_KEY = "--client-id"; + private static final String SECRET_KEY = "--secret"; + private static final Logger LOG = LoggerFactory.getLogger(RemoteDriver.class); private final Map> activeJobs; @@ -99,15 +106,15 @@ private RemoteDriver(String[] args) throws Exception { int serverPort = -1; for (int idx = 0; idx < args.length; idx += 2) { String key = args[idx]; - if (key.equals("--remote-host")) { + if (key.equals(REMOTE_HOST_KEY)) { serverAddress = getArg(args, idx); - } else if (key.equals("--remote-port")) { + } else if (key.equals(REMOTE_PORT_KEY)) { serverPort = Integer.parseInt(getArg(args, idx)); - } else if (key.equals("--client-id")) { + } else if (key.equals(CLIENT_ID_KEY)) { conf.set(SparkClientFactory.CONF_CLIENT_ID, getArg(args, idx)); - } else if (key.equals("--secret")) { + } else if (key.equals(SECRET_KEY)) { conf.set(SparkClientFactory.CONF_KEY_SECRET, getArg(args, idx)); - } else if (key.equals("--conf")) { + } else if (key.equals(CONF_KEY)) { String[] val = getArg(args, idx).split("[=]", 2); conf.set(val[0], val[1]); } else { diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriverLocalRunner.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriverLocalRunner.java new file mode 100644 index 0000000..2055368 --- /dev/null +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriverLocalRunner.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client; + +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Lists; + +import org.apache.spark.launcher.SparkAppHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +class RemoteDriverLocalRunner implements SparkAppHandle { + + private static final Logger LOG = LoggerFactory.getLogger(RemoteDriverLocalRunner.class); + + private Thread thread; + + RemoteDriverLocalRunner(Map conf, String serverAddress, String serverPort, + String clientId, String secret) { + thread = new Thread(new Runnable() { + @Override + public void run() { + List args = Lists.newArrayList(); + args.add("--remote-host"); + args.add(serverAddress); + args.add("--remote-port"); + args.add(serverPort); + args.add("--client-id"); + args.add(clientId); + args.add("--secret"); + args.add(secret); + + for (Map.Entry e : conf.entrySet()) { + args.add("--conf"); + args.add(String.format("%s=%s", e.getKey(), conf.get(e.getKey()))); + } + try { + RemoteDriver.main(args.toArray(new String[args.size()])); + } catch (Exception e) { + LOG.error("Error running driver.", e); + } + } + + }); + thread.setDaemon(true); + thread.setName("Driver"); + } + + @Override + public void addListener(Listener listener) { + + } + + @Override + public State getState() { + return null; + } + + @Override + public String getAppId() { + return null; + } + + @Override + public void stop() { + try { + thread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void kill() { + thread.interrupt(); + } + + @Override + public void disconnect() { + try { + thread.join(); + } catch (InterruptedException e) { + LOG.error("RemoteDriver thread failed to stop", e); + } + } + + void run() { + thread.start(); + } +} diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index d4b63f0..e8ee20f 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -24,36 +24,39 @@ import com.google.common.base.Strings; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.io.Resources; import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; -import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.Serializable; import java.io.Writer; import java.net.URI; import java.net.URL; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.hive.common.ProcessRunner; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -62,18 +65,23 @@ import org.apache.hive.spark.client.rpc.Rpc; import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.apache.hive.spark.client.rpc.RpcServer; + import org.apache.spark.SparkContext; import org.apache.spark.SparkException; +import org.apache.spark.launcher.SparkAppHandle; +import org.apache.spark.launcher.SparkLauncher; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + class SparkClientImpl implements SparkClient { + private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class); private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds - private static final long MAX_ERR_LOG_LINES_FOR_RPC = 1000; private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS"; private static final String SPARK_HOME_ENV = "SPARK_HOME"; @@ -85,22 +93,21 @@ private final Map conf; private final HiveConf hiveConf; - private final AtomicInteger childIdGenerator; - private final Thread driverThread; + private final SparkAppHandle sparkAppHandle; private final Map> jobs; private final Rpc driverRpc; private final ClientProtocol protocol; private volatile boolean isAlive; - SparkClientImpl(RpcServer rpcServer, Map conf, HiveConf hiveConf) throws IOException, SparkException { + SparkClientImpl(RpcServer rpcServer, Map conf, + HiveConf hiveConf) throws IOException, SparkException, InterruptedException { this.conf = conf; this.hiveConf = hiveConf; - this.childIdGenerator = new AtomicInteger(); this.jobs = Maps.newConcurrentMap(); String clientId = UUID.randomUUID().toString(); String secret = rpcServer.createSecret(); - this.driverThread = startDriver(rpcServer, clientId, secret); + this.sparkAppHandle = startDriver(rpcServer, clientId, secret); this.protocol = new ClientProtocol(); try { @@ -109,29 +116,23 @@ } catch (Throwable e) { if (e.getCause() instanceof TimeoutException) { LOG.error("Timed out waiting for client to connect.\nPossible reasons include network " + - "issues, errors in remote driver or the cluster has no available resources, etc." + - "\nPlease check YARN or Spark driver's logs for further information.", e); + "issues, errors in remote driver or the cluster has no available resources, etc." + + "\nPlease check YARN or Spark driver's logs for further information.", e); } else { LOG.error("Error while waiting for client to connect.", e); } - driverThread.interrupt(); - try { - driverThread.join(); - } catch (InterruptedException ie) { - // Give up. - LOG.debug("Interrupted before driver thread was finished."); - } + shutdownSparkHandle(); throw Throwables.propagate(e); } driverRpc.addListener(new Rpc.Listener() { - @Override - public void rpcClosed(Rpc rpc) { - if (isAlive) { - LOG.warn("Client RPC channel closed unexpectedly."); - isAlive = false; - } + @Override + public void rpcClosed(Rpc rpc) { + if (isAlive) { + LOG.warn("Client RPC channel closed unexpectedly."); + isAlive = false; } + } }); isAlive = true; } @@ -164,16 +165,7 @@ public void stop() { } } - long endTime = System.currentTimeMillis() + DEFAULT_SHUTDOWN_TIMEOUT; - try { - driverThread.join(DEFAULT_SHUTDOWN_TIMEOUT); - } catch (InterruptedException ie) { - LOG.debug("Interrupted before driver thread was finished."); - } - if (endTime - System.currentTimeMillis() <= 0) { - LOG.warn("Timed out shutting down remote driver, interrupting..."); - driverThread.interrupt(); - } + shutdownSparkHandle(); } @Override @@ -205,9 +197,9 @@ void cancel(String jobId) { protocol.cancel(jobId); } - private Thread startDriver(final RpcServer rpcServer, final String clientId, final String secret) - throws IOException { - Runnable runnable; + private SparkAppHandle startDriver(final RpcServer rpcServer, final String clientId, final String secret) + throws IOException, InterruptedException { + final String serverAddress = rpcServer.getAddress(); final String serverPort = String.valueOf(rpcServer.getPort()); @@ -215,34 +207,10 @@ private Thread startDriver(final RpcServer rpcServer, final String clientId, fin // Mostly for testing things quickly. Do not do this in production. // when invoked in-process it inherits the environment variables of the parent LOG.warn("!!!! Running remote driver in-process. !!!!"); - runnable = new Runnable() { - @Override - public void run() { - List args = Lists.newArrayList(); - args.add("--remote-host"); - args.add(serverAddress); - args.add("--remote-port"); - args.add(serverPort); - args.add("--client-id"); - args.add(clientId); - args.add("--secret"); - args.add(secret); - - for (Map.Entry e : conf.entrySet()) { - args.add("--conf"); - args.add(String.format("%s=%s", e.getKey(), conf.get(e.getKey()))); - } - try { - RemoteDriver.main(args.toArray(new String[args.size()])); - } catch (Exception e) { - LOG.error("Error running driver.", e); - } - } - }; + RemoteDriverLocalRunner remoteDriverLocalRunner = new RemoteDriverLocalRunner(conf, serverAddress, serverPort, clientId, secret); + remoteDriverLocalRunner.run(); + return remoteDriverLocalRunner; } else { - // If a Spark installation is provided, use the spark-submit script. Otherwise, call the - // SparkSubmit class directly, which has some caveats (like having to provide a proper - // version of Guava on the classpath depending on the deploy mode). String sparkHome = Strings.emptyToNull(conf.get(SPARK_HOME_KEY)); if (sparkHome == null) { sparkHome = Strings.emptyToNull(System.getenv(SPARK_HOME_ENV)); @@ -265,9 +233,9 @@ public void run() { } String driverJavaOpts = Joiner.on(" ").skipNulls().join( - "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(DRIVER_OPTS_KEY)); + "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(DRIVER_OPTS_KEY)); String executorJavaOpts = Joiner.on(" ").skipNulls().join( - "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(EXECUTOR_OPTS_KEY)); + "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(EXECUTOR_OPTS_KEY)); // Create a file with all the job properties to be read by spark-submit. Change the // file's permissions so that only the owner can read it. This avoid having the @@ -303,19 +271,21 @@ public void run() { if (isTesting != null && isTesting.equalsIgnoreCase("true")) { String hiveHadoopTestClasspath = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH")); if (!hiveHadoopTestClasspath.isEmpty()) { - String extraDriverClasspath = Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH)); + String extraDriverClasspath = Strings.nullToEmpty((String) allProps.get(DRIVER_EXTRA_CLASSPATH)); if (extraDriverClasspath.isEmpty()) { allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath); } else { - extraDriverClasspath = extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath : extraDriverClasspath + File.pathSeparator; + extraDriverClasspath = extraDriverClasspath.endsWith( + File.pathSeparator) ? extraDriverClasspath : extraDriverClasspath + File.pathSeparator; allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + hiveHadoopTestClasspath); } - String extraExecutorClasspath = Strings.nullToEmpty((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH)); + String extraExecutorClasspath = Strings.nullToEmpty((String) allProps.get(EXECUTOR_EXTRA_CLASSPATH)); if (extraExecutorClasspath.isEmpty()) { allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath); } else { - extraExecutorClasspath = extraExecutorClasspath.endsWith(File.pathSeparator) ? extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator; + extraExecutorClasspath = extraExecutorClasspath.endsWith( + File.pathSeparator) ? extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator; allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + hiveHadoopTestClasspath); } } @@ -335,64 +305,35 @@ public void run() { Preconditions.checkArgument(master != null, "spark.master is not defined."); String deployMode = conf.get("spark.submit.deployMode"); - List argv = Lists.newLinkedList(); - - if (sparkHome != null) { - argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath()); - } else { - LOG.info("No spark.home provided, calling SparkSubmit directly."); - argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath()); - - if (master.startsWith("local") || master.startsWith("mesos") || - SparkClientUtilities.isYarnClientMode(master, deployMode) || - master.startsWith("spark")) { - String mem = conf.get("spark.driver.memory"); - if (mem != null) { - argv.add("-Xms" + mem); - argv.add("-Xmx" + mem); - } - - String cp = conf.get("spark.driver.extraClassPath"); - if (cp != null) { - argv.add("-classpath"); - argv.add(cp); - } - - String libPath = conf.get("spark.driver.extraLibPath"); - if (libPath != null) { - argv.add("-Djava.library.path=" + libPath); - } - - String extra = conf.get(DRIVER_OPTS_KEY); - if (extra != null) { - for (String opt : extra.split("[ ]")) { - if (!opt.trim().isEmpty()) { - argv.add(opt.trim()); - } - } - } - } - - argv.add("org.apache.spark.deploy.SparkSubmit"); + // Add credential provider password to the child process's environment + // In case of Spark the credential provider location is provided in the jobConf when the job is submitted + String password = getSparkJobCredentialProviderPassword(); + ImmutableMap.Builder env = + new ImmutableMap.Builder<>(); + if (password != null) { + env.put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password); + } + if (isTesting != null) { + env.put("SPARK_TESTING", isTesting); } + SparkLauncher sparkLauncher = new SparkLauncher(env.build()); + sparkLauncher.setSparkHome(sparkHome); + if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) { String executorCores = conf.get("spark.executor.cores"); if (executorCores != null) { - argv.add("--executor-cores"); - argv.add(executorCores); + sparkLauncher.addSparkArg("--executor-cores", executorCores); } String executorMemory = conf.get("spark.executor.memory"); if (executorMemory != null) { - argv.add("--executor-memory"); - argv.add(executorMemory); + sparkLauncher.addSparkArg("--executor-memory", executorMemory); } String numOfExecutors = conf.get("spark.executor.instances"); if (numOfExecutors != null) { - argv.add("--num-executors"); - argv.add(numOfExecutors); + sparkLauncher.addSparkArg("--num-executors", numOfExecutors); } } // The options --principal/--keypad do not work with --proxy-user in spark-submit.sh @@ -403,24 +344,15 @@ public void run() { // long-running application. if ("kerberos".equals(hiveConf.get(HADOOP_SECURITY_AUTHENTICATION))) { String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL), - "0.0.0.0"); + "0.0.0.0"); String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { - List kinitArgv = Lists.newLinkedList(); - kinitArgv.add("kinit"); - kinitArgv.add(principal); - kinitArgv.add("-k"); - kinitArgv.add("-t"); - kinitArgv.add(keyTabFile + ";"); - kinitArgv.addAll(argv); - argv = kinitArgv; + runKinit(principal, keyTabFile); } else { // if doAs is not enabled, we pass the principal/keypad to spark-submit in order to // support the possible delegation token renewal in Spark - argv.add("--principal"); - argv.add(principal); - argv.add("--keytab"); - argv.add(keyTabFile); + sparkLauncher.addSparkArg("--principal", principal); + sparkLauncher.addSparkArg("--keytab", keyTabFile); } } if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { @@ -429,8 +361,7 @@ public void run() { // do not do impersonation in CLI mode if (!currentUser.equals(System.getProperty("user.name"))) { LOG.info("Attempting impersonation of " + currentUser); - argv.add("--proxy-user"); - argv.add(currentUser); + sparkLauncher.addSparkArg("--proxy-user", currentUser); } } catch (Exception e) { String msg = "Cannot obtain username: " + e; @@ -438,85 +369,40 @@ public void run() { } } - argv.add("--properties-file"); - argv.add(properties.getAbsolutePath()); - argv.add("--class"); - argv.add(RemoteDriver.class.getName()); + sparkLauncher.setPropertiesFile(properties.getAbsolutePath()); + sparkLauncher.setMainClass(RemoteDriver.class.getName()); String jar = "spark-internal"; if (SparkContext.jarOfClass(this.getClass()).isDefined()) { jar = SparkContext.jarOfClass(this.getClass()).get(); } - argv.add(jar); + sparkLauncher.setAppResource(jar); - argv.add("--remote-host"); - argv.add(serverAddress); - argv.add("--remote-port"); - argv.add(serverPort); + sparkLauncher.addAppArgs(RemoteDriver.REMOTE_HOST_KEY, serverAddress); + sparkLauncher.addAppArgs(RemoteDriver.REMOTE_PORT_KEY, serverPort); //hive.spark.* keys are passed down to the RemoteDriver via --conf, //as --properties-file contains the spark.* keys that are meant for SparkConf object. for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) { String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey); - argv.add("--conf"); - argv.add(String.format("%s=%s", hiveSparkConfKey, value)); - } - - String cmd = Joiner.on(" ").join(argv); - LOG.info("Running client driver with argv: {}", cmd); - ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd); - - // Prevent hive configurations from being visible in Spark. - pb.environment().remove("HIVE_HOME"); - pb.environment().remove("HIVE_CONF_DIR"); - // Add credential provider password to the child process's environment - // In case of Spark the credential provider location is provided in the jobConf when the job is submitted - String password = getSparkJobCredentialProviderPassword(); - if(password != null) { - pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password); - } - if (isTesting != null) { - pb.environment().put("SPARK_TESTING", isTesting); + if (value != null) { + sparkLauncher.addAppArgs(RemoteDriver.CONF_KEY, String.format("%s=%s", hiveSparkConfKey, value)); + } } - final Process child = pb.start(); - int childId = childIdGenerator.incrementAndGet(); - final List childErrorLog = new ArrayList(); - redirect("stdout-redir-" + childId, new Redirector(child.getInputStream())); - redirect("stderr-redir-" + childId, new Redirector(child.getErrorStream(), childErrorLog)); - - runnable = new Runnable() { - @Override - public void run() { - try { - int exitCode = child.waitFor(); - if (exitCode != 0) { - StringBuilder errStr = new StringBuilder(); - for (String s : childErrorLog) { - errStr.append(s); - errStr.append('\n'); - } - - rpcServer.cancelClient(clientId, - "Child process exited before connecting back with error log " + errStr.toString()); - LOG.warn("Child process exited with code {}", exitCode); - } - } catch (InterruptedException ie) { - LOG.warn("Waiting thread interrupted, killing child process."); - Thread.interrupted(); - child.destroy(); - } catch (Exception e) { - LOG.warn("Exception while waiting for child process.", e); - } - } - }; + LOG.info("Running client driver"); + return sparkLauncher.startApplication(); } + } - Thread thread = new Thread(runnable); - thread.setDaemon(true); - thread.setName("Driver"); - thread.start(); - return thread; + private void runKinit(String principal, String keyTabFile) throws IOException, InterruptedException { + List kinitArgv = Lists.newLinkedList(); + kinitArgv.add("kinit"); + kinitArgv.add(principal); + kinitArgv.add("-k"); + kinitArgv.add("-t"); + kinitArgv.add(keyTabFile + ";"); + new ProcessRunner(LOG).run(kinitArgv); } private String getSparkJobCredentialProviderPassword() { @@ -528,11 +414,49 @@ private String getSparkJobCredentialProviderPassword() { return null; } - private void redirect(String name, Redirector redirector) { - Thread thread = new Thread(redirector); - thread.setName(name); - thread.setDaemon(true); - thread.start(); + /** + * TODO may need to revisit the logic here + */ + private void shutdownSparkHandle() { + Set notRunningSparkStates = Sets.newHashSet(SparkAppHandle.State.FAILED, + SparkAppHandle.State.FINISHED, SparkAppHandle.State.KILLED, SparkAppHandle.State.LOST); + + final Lock shutdownLock = new ReentrantLock(); + Condition shutdownCondition = shutdownLock.newCondition(); + + this.sparkAppHandle.addListener(new SparkAppHandle.Listener() { + @Override + public void stateChanged(SparkAppHandle sparkAppHandle) { + if (notRunningSparkStates.contains(sparkAppHandle.getState())) { + shutdownLock.lock(); + try { + shutdownCondition.signal(); + } finally { + shutdownLock.unlock(); + } + } + } + + @Override + public void infoChanged(SparkAppHandle sparkAppHandle) { + // Do nothing + } + }); + + if (!notRunningSparkStates.contains(this.sparkAppHandle.getState())) { + this.sparkAppHandle.stop(); + shutdownLock.lock(); + try { + if (!shutdownCondition.await(DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) { + this.sparkAppHandle.kill(); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for Spark job to shutdown", e); + Thread.currentThread().interrupt(); + } finally { + shutdownLock.unlock(); + } + } } private class ClientProtocol extends BaseProtocol { @@ -639,48 +563,6 @@ private void handle(ChannelHandlerContext ctx, JobSubmitted msg) { } - private class Redirector implements Runnable { - - private final BufferedReader in; - private List errLogs; - private int numErrLogLines = 0; - - Redirector(InputStream in) { - this.in = new BufferedReader(new InputStreamReader(in)); - } - - Redirector(InputStream in, List errLogs) { - this.in = new BufferedReader(new InputStreamReader(in)); - this.errLogs = errLogs; - } - - @Override - public void run() { - try { - String line = null; - while ((line = in.readLine()) != null) { - LOG.info(line); - if (errLogs != null) { - if (numErrLogLines++ < MAX_ERR_LOG_LINES_FOR_RPC) { - errLogs.add(line); - } - } - } - } catch (IOException e) { - if (isAlive) { - LOG.warn("I/O error in redirector thread.", e); - } else { - // When stopping the remote driver the process might be destroyed during reading from the stream. - // We should not log the related exceptions in a visible level as they might mislead the user. - LOG.debug("I/O error in redirector thread while stopping the remote driver", e); - } - } catch (Exception e) { - LOG.warn("Error in redirector thread.", e); - } - } - - } - private static class AddJarJob implements Job { private static final long serialVersionUID = 1L;