diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 240c1f5..9adf4c1 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2167,6 +2167,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVE_EXECUTION_MODE("hive.execution.mode", "container", new StringSet("container", "llap"), "Chooses whether query fragments will run in container or in llap"), + HIVE_EXECUTION_ENGINE_AVAILABLE("hive.execution.engine.available", "", + "Comma separated list of available engines for auto selection. Regards empty value as all engines are available."), HIVE_EXECUTION_ENGINE_SELECTOR("hive.execution.engine.selector", null, "Chooses execution engine by EngineSelector"), HIVE_EXECUTION_ENGINE_SELECTOR_PARAM("hive.execution.engine.selector.param", null, "Parameter which will be handed over to EngineSelector"), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/EngineSelector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/EngineSelector.java index b058676..c8719ad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/EngineSelector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/EngineSelector.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import java.util.EnumSet; import java.util.HashSet; import java.util.Map; @@ -50,16 +51,17 @@ * @param conf * @param parseContext * @param param + * @param available active engines for selection * @return */ - Engine select(HiveConf conf, ParseContext parseContext, String param); + Engine select(HiveConf conf, ParseContext parseContext, String param, EnumSet available); public static class SimpleSelector implements EngineSelector { static final Log LOG = LogFactory.getLog(EngineSelector.class.getName()); @Override - public Engine select(HiveConf conf, ParseContext pctx, String param) { + public Engine select(HiveConf conf, ParseContext pctx, String param, EnumSet available) { Map pruned = pctx.getPrunedPartitions(); Map> topOps = pctx.getTopOps(); @@ -75,7 +77,7 @@ public Engine select(HiveConf conf, ParseContext pctx, String param) { ContentSummary summary = Utilities.getInputSummary(pctx.getContext(), dummy, null); if (!summary.isValidSummary()) { - LOG.info("Cannot estimate size of " + alias); + LOG.info("Cannot estimate the size of " + alias); total = -1; break; } @@ -86,13 +88,13 @@ public Engine select(HiveConf conf, ParseContext pctx, String param) { param = param.replaceAll("$num_aliases", String.valueOf(topOps.size())); param = param.replaceAll("$total", String.valueOf(total)); param = param.replaceAll("$biggest", String.valueOf(biggest)); - param = param.replaceAll("(\\d)+kb", "$1 * 1024L"); - param = param.replaceAll("(\\d)+mb", "$1 * 1024L * 1024L"); - param = param.replaceAll("(\\d)+gb", "$1 * 1024L * 1024L * 1024L"); - param = param.replaceAll("(\\d)+tb", "$1 * 1024L * 1024L * 1024L * 1024L"); + param = param.replaceAll("(\\d)+( )?kb", "$1 * 1024L"); + param = param.replaceAll("(\\d)+( )?mb", "$1 * 1024L * 1024L"); + param = param.replaceAll("(\\d)+( )?gb", "$1 * 1024L * 1024L * 1024L"); + param = param.replaceAll("(\\d)+( )?tb", "$1 * 1024L * 1024L * 1024L * 1024L"); - Engine selected = evaluate(param.trim(), total < 0); - LOG.info("Engine selected " + selected); + Engine selected = evaluate(available, param.trim(), total < 0); + LOG.info("Execution engine selected " + selected); return selected; } catch (Exception e) { LOG.warn("Failed to select engine by exception ", e); @@ -100,7 +102,7 @@ public Engine select(HiveConf conf, ParseContext pctx, String param) { return Engine.DEFAULT; } - Engine evaluate(String expressions, boolean skipEval) throws Exception { + Engine evaluate(EnumSet available, String expressions, boolean skipEval) throws Exception { ParseDriver driver = new ParseDriver(); CalcitePlanner.ASTSearcher searcher = new CalcitePlanner.ASTSearcher(); @@ -109,11 +111,16 @@ Engine evaluate(String expressions, boolean skipEval) throws Exception { expression = expression.trim(); String[] split = expression.split("="); if (split.length == 1) { - return Engine.valueOf(split[0].trim().toUpperCase()); + Engine engine = engineOfName(split[0]); + if (available.contains(engine)) { + return engine; + } + continue; } if (skipEval) { continue; } + // hack ASTNode astNode = driver.parseSelect("select " + split[1], null); ASTNode target = (ASTNode) searcher.simpleBreadthFirstSearch( @@ -123,10 +130,17 @@ Engine evaluate(String expressions, boolean skipEval) throws Exception { evaluator.initialize(PrimitiveObjectInspectorFactory.javaBooleanObjectInspector); BooleanObjectInspector inspector = (BooleanObjectInspector) evaluator.getOutputOI(); if (inspector.get(evaluator.evaluate(null))) { - return Engine.valueOf(split[0].trim().toUpperCase()); + Engine engine = engineOfName(split[0]); + if (available.contains(engine)) { + return engine; + } } } return null; } + + private Engine engineOfName(String engineName) { + return Engine.valueOf(engineName.trim().toUpperCase()); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index eac812f..f5e33c7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -98,7 +98,7 @@ public int execute(DriverContext driverContext) { try { printConfigInfo(); sparkSessionManager = SparkSessionManagerImpl.getInstance(); - sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager); + sparkSession = SessionState.get().getSparkSession(); SparkWork sparkWork = getWork(); sparkWork.setRequiredCounterPrefix(getCounterPrefixes()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index cf2c3bc..68d4d38 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -120,22 +120,6 @@ public static boolean isDedicatedCluster(Configuration conf) { return master.startsWith("yarn-") || master.startsWith("local"); } - public static SparkSession getSparkSession(HiveConf conf, - SparkSessionManager sparkSessionManager) throws HiveException { - SparkSession sparkSession = SessionState.get().getSparkSession(); - - // Spark configurations are updated close the existing session - if (conf.getSparkConfigUpdated()) { - sparkSessionManager.closeSession(sparkSession); - sparkSession = null; - conf.setSparkConfigUpdated(false); - } - sparkSession = sparkSessionManager.getSession(sparkSession, conf, true); - SessionState.get().setSparkSession(sparkSession); - return sparkSession; - } - - public static String rddGraphToString(JavaPairRDD rdd) { StringBuilder sb = new StringBuilder(); rddToString(rdd.rdd(), sb, ""); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index 5f9225c..cb82e70 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; @@ -44,6 +43,7 @@ import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.session.SessionState; /** * SetSparkReducerParallelism determines how many reducers should @@ -114,8 +114,7 @@ public Object process(Node nd, Stack stack, SparkSession sparkSession = null; try { sparkSessionManager = SparkSessionManagerImpl.getInstance(); - sparkSession = SparkUtilities.getSparkSession( - context.getConf(), sparkSessionManager); + sparkSession = SessionState.get().getSparkSession(); sparkMemoryAndCores = sparkSession.getMemoryAndCores(); } catch (HiveException e) { throw new SemanticException("Failed to get a spark session: " + e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 4af07ad..f0f06b7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -10206,6 +10206,7 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf, pCtx); compiler.init(conf, console, db); compiler.compile(pCtx, rootTasks, inputs, outputs); + compiler.initSession(SessionState.get()); fetchTask = pCtx.getFetchTask(); } LOG.info("Completed plan generation"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 81d02da..745e0d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import com.google.common.collect.Interner; @@ -81,6 +82,11 @@ public void init(HiveConf conf, LogHelper console, Hive db) { this.console = console; } + // called when compilation is completed. + // initialize something(session, for example) for the engine to execute the query. + public void initSession(SessionState session) { + } + @SuppressWarnings({"nls", "unchecked"}) public void compile(final ParseContext pCtx, final List> rootTasks, final HashSet inputs, final HashSet outputs) throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java index 038b3c1..14267dc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java @@ -21,10 +21,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.EngineSelector; +import org.apache.hadoop.hive.ql.exec.EngineSelector.Engine; import org.apache.hadoop.hive.ql.parse.spark.SparkCompiler; import org.apache.hive.common.util.ReflectionUtil; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; + /** * TaskCompilerFactory is a factory class to choose the appropriate * TaskCompiler. @@ -44,33 +50,52 @@ private TaskCompilerFactory() { public static TaskCompiler getCompiler(HiveConf conf, ParseContext parseContext) { EngineSelector selector = getSelector(conf); if (selector != null) { - String param = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE_SELECTOR_PARAM); - EngineSelector.Engine selected = selector.select(conf, parseContext, param); - if (selected == EngineSelector.Engine.MR) { - HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr"); + String param = HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE_SELECTOR_PARAM); + Engine selected = selector.select(conf, parseContext, param, getAvailableEngines(conf)); + if (selected == Engine.MR) { + HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_ENGINE, "mr"); return new MapReduceCompiler(); } - if (selected == EngineSelector.Engine.TEZ) { - HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez"); + if (selected == Engine.TEZ) { + HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_ENGINE, "tez"); return new TezCompiler(); } - if (selected == EngineSelector.Engine.SPARK) { - HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark"); + if (selected == Engine.SPARK) { + HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_ENGINE, "spark"); return new SparkCompiler(); } } - if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + if (HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { return new TezCompiler(); - } else if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + } else if (HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { return new SparkCompiler(); } else { return new MapReduceCompiler(); } } + private static EnumSet getAvailableEngines(HiveConf conf) { + String available = HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE_AVAILABLE).trim(); + if (available.isEmpty()) { + return EnumSet.allOf(Engine.class); + } + List engines = new ArrayList(); + for (String engine : available.split(",")) { + engine = engine.trim(); + try { + if (!engine.isEmpty()) { + engines.add(Engine.valueOf(engine.toUpperCase())); + } + } catch (IllegalArgumentException e) { + LOG.warn("Invalid engine name '" + engine + "'.. skipping it"); + } + } + return EnumSet.copyOf(engines); + } + private static EngineSelector getSelector(HiveConf conf) { - String selector = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE_SELECTOR); + String selector = HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE_SELECTOR); try { if (selector != null) { return (EngineSelector)ReflectionUtil.newInstance(conf.getClassByName(selector), conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index a60527b..3fa75d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -86,6 +87,7 @@ import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; /** @@ -111,6 +113,37 @@ public void init(HiveConf conf, LogHelper console, Hive db) { } @Override + public void initSession(SessionState session) { + super.initSession(session); + if (session.isHiveServerQuery()) { + try { + TezSessionState tezSession = session.getTezSession(); + if (tezSession == null) { + session.setTezSession(tezSession = new TezSessionState(session.getSessionId())); + } + if (tezSession.isOpen()) { + return; + } + if (tezSession.isOpening()) { + if (!session.isInitSessionAsync()) { + tezSession.endOpen(); // block on this + } + return; + } + // Neither open nor opening. + if (!session.isInitSessionAsync()) { + tezSession.open(session.getConf()); // should use conf on session start-up + } else { + tezSession.beginOpen(session.getConf(), null, console); + } + } catch(Exception e) { + LOG.warn("Failed to initialize tez session"); + throw new RuntimeException(e); + } + } + } + + @Override protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, Set outputs) throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 9ec7fd6..6a17b41 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -44,6 +44,9 @@ import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.CompositeProcessor; @@ -63,6 +66,7 @@ import org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization; import org.apache.hadoop.hive.ql.optimizer.SparkRemoveDynamicPruningBySize; import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; @@ -103,6 +107,26 @@ public SparkCompiler() { } @Override + public void initSession(SessionState session) { + super.initSession(session); + + SparkSession sparkSession = session.getSparkSession(); + // Spark configurations are updated close the existing session + try { + SparkSessionManager sparkSessionManager = SparkSessionManagerImpl.getInstance(); + if (sparkSession != null && conf.getSparkConfigUpdated()) { + sparkSessionManager.closeSession(sparkSession); + conf.setSparkConfigUpdated(false); + } + sparkSession = sparkSessionManager.getSession(sparkSession, conf, true); + session.setSparkSession(sparkSession); + } catch (HiveException e) { + LOGGER.warn("Failed to init spark session", e); + throw new RuntimeException(e); + } + } + + @Override protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, Set outputs) throws SemanticException { PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE); @@ -135,7 +159,7 @@ private void runDynamicPartitionPruning(OptimizeSparkProcContext procCtx) ParseContext parseContext = procCtx.getParseContext(); Map opRules = new LinkedHashMap(); opRules.put( - new RuleRegExp(new String("Dynamic Partition Pruning"), + new RuleRegExp("Dynamic Partition Pruning", FilterOperator.getOperatorName() + "%"), new DynamicPartitionPruningOptimization()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 41b4bb1..6b1f2c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -209,6 +209,8 @@ private SparkSession sparkSession; + private transient boolean initSessionAsync; + /** * Gets information about HDFS encryption */ @@ -472,12 +474,14 @@ public static void detachSession() { * when switching from one session to another. */ public static SessionState start(SessionState startSs) { - start(startSs, false, null); + startSs.initSessionAsync = false; + start(startSs, null); return startSs; } public static void beginStart(SessionState startSs, LogHelper console) { - start(startSs, true, console); + startSs.initSessionAsync = true; + start(startSs, console); } public static void endStart(SessionState startSs) @@ -486,7 +490,7 @@ public static void endStart(SessionState startSs) startSs.tezSessionState.endOpen(); } - private static void start(SessionState startSs, boolean isAsync, LogHelper console) { + private static void start(SessionState startSs, LogHelper console) { setCurrentSessionState(startSs); if (startSs.hiveHist == null){ @@ -533,32 +537,6 @@ private static void start(SessionState startSs, boolean isAsync, LogHelper conso // that would cause ClassNoFoundException otherwise throw new RuntimeException(e); } - - String engine = HiveConf.getVar(startSs.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); - if (!engine.equals("tez") || startSs.isHiveServerQuery) return; - - try { - if (startSs.tezSessionState == null) { - startSs.tezSessionState = new TezSessionState(startSs.getSessionId()); - } - if (startSs.tezSessionState.isOpen()) { - return; - } - if (startSs.tezSessionState.isOpening()) { - if (!isAsync) { - startSs.tezSessionState.endOpen(); - } - return; - } - // Neither open nor opening. - if (!isAsync) { - startSs.tezSessionState.open(startSs.conf); // should use conf on session start-up - } else { - startSs.tezSessionState.beginOpen(startSs.conf, null, console); - } - } catch (Exception e) { - throw new RuntimeException(e); - } } /** @@ -620,7 +598,7 @@ private Path createRootHDFSDir(HiveConf conf) throws IOException { Utilities.createDirsWithPermission(conf, rootHDFSDirPath, writableHDFSDirPermission, true); } FsPermission currentHDFSDirPermission = fs.getFileStatus(rootHDFSDirPath).getPermission(); - if (rootHDFSDirPath != null && rootHDFSDirPath.toUri() != null) { + if (rootHDFSDirPath.toUri() != null) { String schema = rootHDFSDirPath.toUri().getScheme(); LOG.debug( "HDFS root scratch dir: " + rootHDFSDirPath + " with schema " + schema + ", permission: " + @@ -1075,8 +1053,9 @@ public void reloadAuxJars() throws IOException { // remove the previous renewable jars try { - if (preReloadableAuxJars != null && !preReloadableAuxJars.isEmpty()) { - Utilities.removeFromClassPath(preReloadableAuxJars.toArray(new String[0])); + if (!preReloadableAuxJars.isEmpty()) { + Utilities.removeFromClassPath(preReloadableAuxJars.toArray( + new String[preReloadableAuxJars.size()])); } } catch (Exception e) { String msg = "Fail to remove the reloaded jars loaded last time: " + e; @@ -1084,12 +1063,12 @@ public void reloadAuxJars() throws IOException { } try { - if (reloadedAuxJars != null && !reloadedAuxJars.isEmpty()) { + if (!reloadedAuxJars.isEmpty()) { URLClassLoader currentCLoader = (URLClassLoader) SessionState.get().getConf().getClassLoader(); currentCLoader = (URLClassLoader) Utilities.addToClassPath(currentCLoader, - reloadedAuxJars.toArray(new String[0])); + reloadedAuxJars.toArray(new String[reloadedAuxJars.size()])); conf.setClassLoader(currentCLoader); Thread.currentThread().setContextClassLoader(currentCLoader); } @@ -1106,7 +1085,8 @@ static void registerJars(List newJars) throws IllegalArgumentException { LogHelper console = getConsole(); try { ClassLoader loader = Thread.currentThread().getContextClassLoader(); - ClassLoader newLoader = Utilities.addToClassPath(loader, newJars.toArray(new String[0])); + ClassLoader newLoader = Utilities.addToClassPath(loader, + newJars.toArray(new String[newJars.size()])); Thread.currentThread().setContextClassLoader(newLoader); SessionState.get().getConf().setClassLoader(newLoader); console.printInfo("Added " + newJars + " to class path"); @@ -1119,7 +1099,7 @@ static void registerJars(List newJars) throws IllegalArgumentException { static boolean unregisterJar(List jarsToUnregister) { LogHelper console = getConsole(); try { - Utilities.removeFromClassPath(jarsToUnregister.toArray(new String[0])); + Utilities.removeFromClassPath(jarsToUnregister.toArray(new String[jarsToUnregister.size()])); console.printInfo("Deleted " + jarsToUnregister + " from class path"); return true; } catch (Exception e) { @@ -1643,6 +1623,10 @@ public void setSparkSession(SparkSession sparkSession) { this.sparkSession = sparkSession; } + public boolean isInitSessionAsync() { + return initSessionAsync; + } + /** * Get the next suffix to use in naming a temporary table created by insert...values * @return suffix