commit c4e7e7f5856b73a8b23aa2dfccc06a8556067e8a Author: Sahil Takiar Date: Fri Jan 5 11:29:36 2018 -0800 HIVE-18214: Flaky test: TestSparkClient diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index 50c7bb20c4..8abeed82c5 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -34,9 +34,6 @@ @InterfaceAudience.Private public final class SparkClientFactory { - /** Used to run the driver in-process, mostly for testing. */ - static final String CONF_KEY_IN_PROCESS = "spark.client.do_not_use.run_driver_in_process"; - /** Used by client and driver to share a client ID for establishing an RPC session. */ static final String CONF_CLIENT_ID = "spark.client.authentication.client_id"; diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 49b7deb5ee..eed8e53fdd 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -214,324 +214,294 @@ private Thread startDriver(final RpcServer rpcServer, final String clientId, fin final String serverAddress = rpcServer.getAddress(); final String serverPort = String.valueOf(rpcServer.getPort()); - if (conf.containsKey(SparkClientFactory.CONF_KEY_IN_PROCESS)) { - // Mostly for testing things quickly. Do not do this in production. - // when invoked in-process it inherits the environment variables of the parent - LOG.warn("!!!! Running remote driver in-process. !!!!"); - runnable = new Runnable() { - @Override - public void run() { - List args = Lists.newArrayList(); - args.add("--remote-host"); - args.add(serverAddress); - args.add("--remote-port"); - args.add(serverPort); - args.add("--client-id"); - args.add(clientId); - args.add("--secret"); - args.add(secret); - - for (Map.Entry e : conf.entrySet()) { - args.add("--conf"); - args.add(String.format("%s=%s", e.getKey(), conf.get(e.getKey()))); - } - try { - RemoteDriver.main(args.toArray(new String[args.size()])); - } catch (Exception e) { - LOG.error("Error running driver.", e); - } - } - }; - } else { - // If a Spark installation is provided, use the spark-submit script. Otherwise, call the - // SparkSubmit class directly, which has some caveats (like having to provide a proper - // version of Guava on the classpath depending on the deploy mode). - String sparkHome = Strings.emptyToNull(conf.get(SPARK_HOME_KEY)); - if (sparkHome == null) { - sparkHome = Strings.emptyToNull(System.getenv(SPARK_HOME_ENV)); - } + // If a Spark installation is provided, use the spark-submit script. Otherwise, call the + // SparkSubmit class directly, which has some caveats (like having to provide a proper + // version of Guava on the classpath depending on the deploy mode). + String sparkHome = Strings.emptyToNull(conf.get(SPARK_HOME_KEY)); + if (sparkHome == null) { + sparkHome = Strings.emptyToNull(System.getenv(SPARK_HOME_ENV)); + } + if (sparkHome == null) { + sparkHome = Strings.emptyToNull(System.getProperty(SPARK_HOME_KEY)); + } + String sparkLogDir = conf.get("hive.spark.log.dir"); + if (sparkLogDir == null) { if (sparkHome == null) { - sparkHome = Strings.emptyToNull(System.getProperty(SPARK_HOME_KEY)); - } - String sparkLogDir = conf.get("hive.spark.log.dir"); - if (sparkLogDir == null) { - if (sparkHome == null) { - sparkLogDir = "./target/"; - } else { - sparkLogDir = sparkHome + "/logs/"; - } + sparkLogDir = "./target/"; + } else { + sparkLogDir = sparkHome + "/logs/"; } + } - String osxTestOpts = ""; - if (Strings.nullToEmpty(System.getProperty("os.name")).toLowerCase().contains("mac")) { - osxTestOpts = Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS)); - } + String osxTestOpts = ""; + if (Strings.nullToEmpty(System.getProperty("os.name")).toLowerCase().contains("mac")) { + osxTestOpts = Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS)); + } - String driverJavaOpts = Joiner.on(" ").skipNulls().join( - "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(DRIVER_OPTS_KEY)); - String executorJavaOpts = Joiner.on(" ").skipNulls().join( - "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(EXECUTOR_OPTS_KEY)); - - // Create a file with all the job properties to be read by spark-submit. Change the - // file's permissions so that only the owner can read it. This avoid having the - // connection secret show up in the child process's command line. - File properties = File.createTempFile("spark-submit.", ".properties"); - if (!properties.setReadable(false) || !properties.setReadable(true, true)) { - throw new IOException("Cannot change permissions of job properties file."); - } - properties.deleteOnExit(); + String driverJavaOpts = Joiner.on(" ").skipNulls().join( + "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(DRIVER_OPTS_KEY)); + String executorJavaOpts = Joiner.on(" ").skipNulls().join( + "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(EXECUTOR_OPTS_KEY)); + + // Create a file with all the job properties to be read by spark-submit. Change the + // file's permissions so that only the owner can read it. This avoid having the + // connection secret show up in the child process's command line. + File properties = File.createTempFile("spark-submit.", ".properties"); + if (!properties.setReadable(false) || !properties.setReadable(true, true)) { + throw new IOException("Cannot change permissions of job properties file."); + } + properties.deleteOnExit(); - Properties allProps = new Properties(); - // first load the defaults from spark-defaults.conf if available - try { - URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf"); - if (sparkDefaultsUrl != null) { - LOG.info("Loading spark defaults: " + sparkDefaultsUrl); - allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl))); - } - } catch (Exception e) { - String msg = "Exception trying to load spark-defaults.conf: " + e; - throw new IOException(msg, e); - } - // then load the SparkClientImpl config - for (Map.Entry e : conf.entrySet()) { - allProps.put(e.getKey(), conf.get(e.getKey())); + Properties allProps = new Properties(); + // first load the defaults from spark-defaults.conf if available + try { + URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf"); + if (sparkDefaultsUrl != null) { + LOG.info("Loading spark defaults: " + sparkDefaultsUrl); + allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl))); } - allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId); - allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret); - allProps.put(DRIVER_OPTS_KEY, driverJavaOpts); - allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts); - - String isTesting = conf.get("spark.testing"); - if (isTesting != null && isTesting.equalsIgnoreCase("true")) { - String hiveHadoopTestClasspath = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH")); - if (!hiveHadoopTestClasspath.isEmpty()) { - String extraDriverClasspath = Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH)); - if (extraDriverClasspath.isEmpty()) { - allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath); - } else { - extraDriverClasspath = extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath : extraDriverClasspath + File.pathSeparator; - allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + hiveHadoopTestClasspath); - } + } catch (Exception e) { + String msg = "Exception trying to load spark-defaults.conf: " + e; + throw new IOException(msg, e); + } + // then load the SparkClientImpl config + for (Map.Entry e : conf.entrySet()) { + allProps.put(e.getKey(), conf.get(e.getKey())); + } + allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId); + allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret); + allProps.put(DRIVER_OPTS_KEY, driverJavaOpts); + allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts); + + String isTesting = conf.get("spark.testing"); + if (isTesting != null && isTesting.equalsIgnoreCase("true")) { + String hiveHadoopTestClasspath = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH")); + if (!hiveHadoopTestClasspath.isEmpty()) { + String extraDriverClasspath = Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH)); + if (extraDriverClasspath.isEmpty()) { + allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath); + } else { + extraDriverClasspath = extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath : extraDriverClasspath + File.pathSeparator; + allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + hiveHadoopTestClasspath); + } - String extraExecutorClasspath = Strings.nullToEmpty((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH)); - if (extraExecutorClasspath.isEmpty()) { - allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath); - } else { - extraExecutorClasspath = extraExecutorClasspath.endsWith(File.pathSeparator) ? extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator; - allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + hiveHadoopTestClasspath); - } + String extraExecutorClasspath = Strings.nullToEmpty((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH)); + if (extraExecutorClasspath.isEmpty()) { + allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath); + } else { + extraExecutorClasspath = extraExecutorClasspath.endsWith(File.pathSeparator) ? extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator; + allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + hiveHadoopTestClasspath); } } + } - Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8); - try { - allProps.store(writer, "Spark Context configuration"); - } finally { - writer.close(); - } + Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8); + try { + allProps.store(writer, "Spark Context configuration"); + } finally { + writer.close(); + } - // Define how to pass options to the child process. If launching in client (or local) - // mode, the driver options need to be passed directly on the command line. Otherwise, - // SparkSubmit will take care of that for us. - String master = conf.get("spark.master"); - Preconditions.checkArgument(master != null, "spark.master is not defined."); - String deployMode = conf.get("spark.submit.deployMode"); + // Define how to pass options to the child process. If launching in client (or local) + // mode, the driver options need to be passed directly on the command line. Otherwise, + // SparkSubmit will take care of that for us. + String master = conf.get("spark.master"); + Preconditions.checkArgument(master != null, "spark.master is not defined."); + String deployMode = conf.get("spark.submit.deployMode"); - List argv = Lists.newLinkedList(); + List argv = Lists.newLinkedList(); - if (sparkHome != null) { - argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath()); - } else { - LOG.info("No spark.home provided, calling SparkSubmit directly."); - argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath()); - - if (master.startsWith("local") || master.startsWith("mesos") || - SparkClientUtilities.isYarnClientMode(master, deployMode) || - master.startsWith("spark")) { - String mem = conf.get("spark.driver.memory"); - if (mem != null) { - argv.add("-Xms" + mem); - argv.add("-Xmx" + mem); - } + if (sparkHome != null) { + argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath()); + } else { + LOG.info("No spark.home provided, calling SparkSubmit directly."); + argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath()); + + if (master.startsWith("local") || master.startsWith("mesos") || + SparkClientUtilities.isYarnClientMode(master, deployMode) || + master.startsWith("spark")) { + String mem = conf.get("spark.driver.memory"); + if (mem != null) { + argv.add("-Xms" + mem); + argv.add("-Xmx" + mem); + } - String cp = conf.get("spark.driver.extraClassPath"); - if (cp != null) { - argv.add("-classpath"); - argv.add(cp); - } + String cp = conf.get("spark.driver.extraClassPath"); + if (cp != null) { + argv.add("-classpath"); + argv.add(cp); + } - String libPath = conf.get("spark.driver.extraLibPath"); - if (libPath != null) { - argv.add("-Djava.library.path=" + libPath); - } + String libPath = conf.get("spark.driver.extraLibPath"); + if (libPath != null) { + argv.add("-Djava.library.path=" + libPath); + } - String extra = conf.get(DRIVER_OPTS_KEY); - if (extra != null) { - for (String opt : extra.split("[ ]")) { - if (!opt.trim().isEmpty()) { - argv.add(opt.trim()); - } + String extra = conf.get(DRIVER_OPTS_KEY); + if (extra != null) { + for (String opt : extra.split("[ ]")) { + if (!opt.trim().isEmpty()) { + argv.add(opt.trim()); } } } - - argv.add("org.apache.spark.deploy.SparkSubmit"); } - if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) { - String executorCores = conf.get("spark.executor.cores"); - if (executorCores != null) { - argv.add("--executor-cores"); - argv.add(executorCores); - } + argv.add("org.apache.spark.deploy.SparkSubmit"); + } - String executorMemory = conf.get("spark.executor.memory"); - if (executorMemory != null) { - argv.add("--executor-memory"); - argv.add(executorMemory); - } + if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) { + String executorCores = conf.get("spark.executor.cores"); + if (executorCores != null) { + argv.add("--executor-cores"); + argv.add(executorCores); + } - String numOfExecutors = conf.get("spark.executor.instances"); - if (numOfExecutors != null) { - argv.add("--num-executors"); - argv.add(numOfExecutors); - } + String executorMemory = conf.get("spark.executor.memory"); + if (executorMemory != null) { + argv.add("--executor-memory"); + argv.add(executorMemory); } - // The options --principal/--keypad do not work with --proxy-user in spark-submit.sh - // (see HIVE-15485, SPARK-5493, SPARK-19143), so Hive could only support doAs or - // delegation token renewal, but not both. Since doAs is a more common case, if both - // are needed, we choose to favor doAs. So when doAs is enabled, we use kinit command, - // otherwise, we pass the principal/keypad to spark to support the token renewal for - // long-running application. - if ("kerberos".equals(hiveConf.get(HADOOP_SECURITY_AUTHENTICATION))) { - String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL), - "0.0.0.0"); - String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); - if (StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(keyTabFile)) { - if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { - List kinitArgv = Lists.newLinkedList(); - kinitArgv.add("kinit"); - kinitArgv.add(principal); - kinitArgv.add("-k"); - kinitArgv.add("-t"); - kinitArgv.add(keyTabFile + ";"); - kinitArgv.addAll(argv); - argv = kinitArgv; - } else { - // if doAs is not enabled, we pass the principal/keypad to spark-submit in order to - // support the possible delegation token renewal in Spark - argv.add("--principal"); - argv.add(principal); - argv.add("--keytab"); - argv.add(keyTabFile); - } + + String numOfExecutors = conf.get("spark.executor.instances"); + if (numOfExecutors != null) { + argv.add("--num-executors"); + argv.add(numOfExecutors); + } + } + // The options --principal/--keypad do not work with --proxy-user in spark-submit.sh + // (see HIVE-15485, SPARK-5493, SPARK-19143), so Hive could only support doAs or + // delegation token renewal, but not both. Since doAs is a more common case, if both + // are needed, we choose to favor doAs. So when doAs is enabled, we use kinit command, + // otherwise, we pass the principal/keypad to spark to support the token renewal for + // long-running application. + if ("kerberos".equals(hiveConf.get(HADOOP_SECURITY_AUTHENTICATION))) { + String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL), + "0.0.0.0"); + String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); + if (StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(keyTabFile)) { + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { + List kinitArgv = Lists.newLinkedList(); + kinitArgv.add("kinit"); + kinitArgv.add(principal); + kinitArgv.add("-k"); + kinitArgv.add("-t"); + kinitArgv.add(keyTabFile + ";"); + kinitArgv.addAll(argv); + argv = kinitArgv; + } else { + // if doAs is not enabled, we pass the principal/keypad to spark-submit in order to + // support the possible delegation token renewal in Spark + argv.add("--principal"); + argv.add(principal); + argv.add("--keytab"); + argv.add(keyTabFile); } } - if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { - try { - String currentUser = Utils.getUGI().getShortUserName(); - // do not do impersonation in CLI mode - if (!currentUser.equals(System.getProperty("user.name"))) { - LOG.info("Attempting impersonation of " + currentUser); - argv.add("--proxy-user"); - argv.add(currentUser); - } - } catch (Exception e) { - String msg = "Cannot obtain username: " + e; - throw new IllegalStateException(msg, e); + } + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { + try { + String currentUser = Utils.getUGI().getShortUserName(); + // do not do impersonation in CLI mode + if (!currentUser.equals(System.getProperty("user.name"))) { + LOG.info("Attempting impersonation of " + currentUser); + argv.add("--proxy-user"); + argv.add(currentUser); } + } catch (Exception e) { + String msg = "Cannot obtain username: " + e; + throw new IllegalStateException(msg, e); } + } - String regStr = conf.get("spark.kryo.registrator"); - if (HIVE_KRYO_REG_NAME.equals(regStr)) { - argv.add("--jars"); - argv.add(SparkClientUtilities.findKryoRegistratorJar(hiveConf)); - } + String regStr = conf.get("spark.kryo.registrator"); + if (HIVE_KRYO_REG_NAME.equals(regStr)) { + argv.add("--jars"); + argv.add(SparkClientUtilities.findKryoRegistratorJar(hiveConf)); + } - argv.add("--properties-file"); - argv.add(properties.getAbsolutePath()); - argv.add("--class"); - argv.add(RemoteDriver.class.getName()); + argv.add("--properties-file"); + argv.add(properties.getAbsolutePath()); + argv.add("--class"); + argv.add(RemoteDriver.class.getName()); - String jar = "spark-internal"; - if (SparkContext.jarOfClass(this.getClass()).isDefined()) { - jar = SparkContext.jarOfClass(this.getClass()).get(); - } - argv.add(jar); - - argv.add("--remote-host"); - argv.add(serverAddress); - argv.add("--remote-port"); - argv.add(serverPort); - - //hive.spark.* keys are passed down to the RemoteDriver via --conf, - //as --properties-file contains the spark.* keys that are meant for SparkConf object. - for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) { - String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey); - argv.add("--conf"); - argv.add(String.format("%s=%s", hiveSparkConfKey, value)); - } + String jar = "spark-internal"; + if (SparkContext.jarOfClass(this.getClass()).isDefined()) { + jar = SparkContext.jarOfClass(this.getClass()).get(); + } + argv.add(jar); + + argv.add("--remote-host"); + argv.add(serverAddress); + argv.add("--remote-port"); + argv.add(serverPort); + + //hive.spark.* keys are passed down to the RemoteDriver via --conf, + //as --properties-file contains the spark.* keys that are meant for SparkConf object. + for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) { + String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey); + argv.add("--conf"); + argv.add(String.format("%s=%s", hiveSparkConfKey, value)); + } - String cmd = Joiner.on(" ").join(argv); - LOG.info("Running client driver with argv: {}", cmd); - ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd); - - // Prevent hive configurations from being visible in Spark. - pb.environment().remove("HIVE_HOME"); - pb.environment().remove("HIVE_CONF_DIR"); - // Add credential provider password to the child process's environment - // In case of Spark the credential provider location is provided in the jobConf when the job is submitted - String password = getSparkJobCredentialProviderPassword(); - if(password != null) { - pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password); - } - if (isTesting != null) { - pb.environment().put("SPARK_TESTING", isTesting); - } + String cmd = Joiner.on(" ").join(argv); + LOG.info("Running client driver with argv: {}", cmd); + ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd); + + // Prevent hive configurations from being visible in Spark. + pb.environment().remove("HIVE_HOME"); + pb.environment().remove("HIVE_CONF_DIR"); + // Add credential provider password to the child process's environment + // In case of Spark the credential provider location is provided in the jobConf when the job is submitted + String password = getSparkJobCredentialProviderPassword(); + if(password != null) { + pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password); + } + if (isTesting != null) { + pb.environment().put("SPARK_TESTING", isTesting); + } - final Process child = pb.start(); - String threadName = Thread.currentThread().getName(); - final List childErrorLog = Collections.synchronizedList(new ArrayList()); - final LogRedirector.LogSourceCallback callback = () -> {return isAlive;}; + final Process child = pb.start(); + String threadName = Thread.currentThread().getName(); + final List childErrorLog = Collections.synchronizedList(new ArrayList()); + final LogRedirector.LogSourceCallback callback = () -> {return isAlive;}; - LogRedirector.redirect("RemoteDriver-stdout-redir-" + threadName, - new LogRedirector(child.getInputStream(), LOG, callback)); - LogRedirector.redirect("RemoteDriver-stderr-redir-" + threadName, - new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback)); + LogRedirector.redirect("RemoteDriver-stdout-redir-" + threadName, + new LogRedirector(child.getInputStream(), LOG, callback)); + LogRedirector.redirect("RemoteDriver-stderr-redir-" + threadName, + new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback)); - runnable = new Runnable() { - @Override - public void run() { - try { - int exitCode = child.waitFor(); - if (exitCode != 0) { - StringBuilder errStr = new StringBuilder(); - synchronized(childErrorLog) { - Iterator iter = childErrorLog.iterator(); - while(iter.hasNext()){ - errStr.append(iter.next()); - errStr.append('\n'); - } + runnable = new Runnable() { + @Override + public void run() { + try { + int exitCode = child.waitFor(); + if (exitCode != 0) { + StringBuilder errStr = new StringBuilder(); + synchronized(childErrorLog) { + Iterator iter = childErrorLog.iterator(); + while(iter.hasNext()){ + errStr.append(iter.next()); + errStr.append('\n'); } - - LOG.warn("Child process exited with code {}", exitCode); - rpcServer.cancelClient(clientId, - "Child process (spark-submit) exited before connecting back with error log " + errStr.toString()); } - } catch (InterruptedException ie) { - LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process."); - rpcServer.cancelClient(clientId, "Thread waiting on the child porcess (spark-submit) is interrupted"); - Thread.interrupted(); - child.destroy(); - } catch (Exception e) { - String errMsg = "Exception while waiting for child process (spark-submit)"; - LOG.warn(errMsg, e); - rpcServer.cancelClient(clientId, errMsg); + + LOG.warn("Child process exited with code {}", exitCode); + rpcServer.cancelClient(clientId, + "Child process (spark-submit) exited before connecting back with error log " + errStr.toString()); } + } catch (InterruptedException ie) { + LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process."); + rpcServer.cancelClient(clientId, "Thread waiting on the child porcess (spark-submit) is interrupted"); + Thread.interrupted(); + child.destroy(); + } catch (Exception e) { + String errMsg = "Exception while waiting for child process (spark-submit)"; + LOG.warn(errMsg, e); + rpcServer.cancelClient(clientId, errMsg); } - }; - } + } + }; Thread thread = new Thread(runnable); thread.setDaemon(true); diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java index 697d8d144d..23df7920d3 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java @@ -68,19 +68,14 @@ private static final long TIMEOUT = 20; private static final HiveConf HIVECONF = new HiveConf(); - private Map createConf(boolean local) { + private Map createConf() { Map conf = new HashMap(); - if (local) { - conf.put(SparkClientFactory.CONF_KEY_IN_PROCESS, "true"); - conf.put("spark.master", "local"); - conf.put("spark.app.name", "SparkClientSuite Local App"); - } else { - String classpath = System.getProperty("java.class.path"); - conf.put("spark.master", "local"); - conf.put("spark.app.name", "SparkClientSuite Remote App"); - conf.put("spark.driver.extraClassPath", classpath); - conf.put("spark.executor.extraClassPath", classpath); - } + + String classpath = System.getProperty("java.class.path"); + conf.put("spark.master", "local"); + conf.put("spark.app.name", "SparkClientSuite Remote App"); + conf.put("spark.driver.extraClassPath", classpath); + conf.put("spark.executor.extraClassPath", classpath); if (!Strings.isNullOrEmpty(System.getProperty("spark.home"))) { conf.put("spark.home", System.getProperty("spark.home")); @@ -91,7 +86,7 @@ @Test public void testJobSubmission() throws Exception { - runTest(true, new TestFunction() { + runTest(new TestFunction() { @Override public void call(SparkClient client) throws Exception { JobHandle.Listener listener = newListener(); @@ -112,7 +107,7 @@ public void call(SparkClient client) throws Exception { @Test public void testSimpleSparkJob() throws Exception { - runTest(true, new TestFunction() { + runTest(new TestFunction() { @Override public void call(SparkClient client) throws Exception { JobHandle handle = client.submit(new SparkJob()); @@ -123,7 +118,7 @@ public void call(SparkClient client) throws Exception { @Test public void testErrorJob() throws Exception { - runTest(true, new TestFunction() { + runTest(new TestFunction() { @Override public void call(SparkClient client) throws Exception { JobHandle.Listener listener = newListener(); @@ -151,7 +146,7 @@ public void call(SparkClient client) throws Exception { @Test public void testSyncRpc() throws Exception { - runTest(true, new TestFunction() { + runTest(new TestFunction() { @Override public void call(SparkClient client) throws Exception { Future result = client.run(new SyncRpc()); @@ -160,20 +155,9 @@ public void call(SparkClient client) throws Exception { }); } - @Test - public void testRemoteClient() throws Exception { - runTest(false, new TestFunction() { - @Override - public void call(SparkClient client) throws Exception { - JobHandle handle = client.submit(new SparkJob()); - assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS)); - } - }); - } - @Test public void testMetricsCollection() throws Exception { - runTest(true, new TestFunction() { + runTest(new TestFunction() { @Override public void call(SparkClient client) throws Exception { JobHandle.Listener listener = newListener(); @@ -202,7 +186,7 @@ public void call(SparkClient client) throws Exception { @Test public void testAddJarsAndFiles() throws Exception { - runTest(true, new TestFunction() { + runTest(new TestFunction() { @Override public void call(SparkClient client) throws Exception { File jar = null; @@ -256,7 +240,7 @@ public void call(SparkClient client) throws Exception { @Test public void testCounters() throws Exception { - runTest(true, new TestFunction() { + runTest(new TestFunction() { @Override public void call(SparkClient client) throws Exception { JobHandle job = client.submit(new CounterIncrementJob()); @@ -308,8 +292,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable { }).when(listener); } - private void runTest(boolean local, TestFunction test) throws Exception { - Map conf = createConf(local); + private void runTest(TestFunction test) throws Exception { + Map conf = createConf(); SparkClientFactory.initialize(conf); SparkClient client = null; try {