diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index ff4924d..7a5b71f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -53,6 +53,8 @@ private static final Logger LOG = LoggerFactory.getLogger(SetSparkReducerParallelism.class.getName()); + private static final String SPARK_DYNAMIC_ALLOCATION_ENABLED = "spark.dynamicAllocation.enabled"; + // Spark memory per task, and total number of cores private ObjectPair sparkMemoryAndCores; @@ -109,34 +111,12 @@ public Object process(Node nd, Stack stack, } } - if (sparkMemoryAndCores == null) { - SparkSessionManager sparkSessionManager = null; - SparkSession sparkSession = null; - try { - sparkSessionManager = SparkSessionManagerImpl.getInstance(); - sparkSession = SparkUtilities.getSparkSession( - context.getConf(), sparkSessionManager); - sparkMemoryAndCores = sparkSession.getMemoryAndCores(); - } catch (HiveException e) { - throw new SemanticException("Failed to get a spark session: " + e); - } catch (Exception e) { - LOG.warn("Failed to get spark memory/core info", e); - } finally { - if (sparkSession != null && sparkSessionManager != null) { - try { - sparkSessionManager.returnSession(sparkSession); - } catch (HiveException ex) { - LOG.error("Failed to return the session to SessionManager: " + ex, ex); - } - } - } - } - // Divide it by 2 so that we can have more reducers long bytesPerReducer = context.getConf().getLongVar(HiveConf.ConfVars.BYTESPERREDUCER) / 2; int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer, maxReducers, false); + getSparkMemoryAndCores(context); if (sparkMemoryAndCores != null && sparkMemoryAndCores.getFirst() > 0 && sparkMemoryAndCores.getSecond() > 0) { // warn the user if bytes per reducer is much larger than memory per task @@ -184,4 +164,34 @@ private boolean needSetParallelism(ReduceSinkOperator reduceSink, HiveConf hiveC return false; } + private void getSparkMemoryAndCores(OptimizeSparkProcContext context) throws SemanticException { + if (context.getConf().getBoolean(SPARK_DYNAMIC_ALLOCATION_ENABLED, false)) { + // If dynamic allocation is enabled, numbers for memory and cores are meaningless. So, we don't + // try to get it. + sparkMemoryAndCores = null; + return; + } + + SparkSessionManager sparkSessionManager = null; + SparkSession sparkSession = null; + try { + sparkSessionManager = SparkSessionManagerImpl.getInstance(); + sparkSession = SparkUtilities.getSparkSession( + context.getConf(), sparkSessionManager); + sparkMemoryAndCores = sparkSession.getMemoryAndCores(); + } catch (HiveException e) { + throw new SemanticException("Failed to get a spark session: " + e); + } catch (Exception e) { + LOG.warn("Failed to get spark memory/core info", e); + } finally { + if (sparkSession != null && sparkSessionManager != null) { + try { + sparkSessionManager.returnSession(sparkSession); + } catch (HiveException ex) { + LOG.error("Failed to return the session to SessionManager: " + ex, ex); + } + } + } + } + }