diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 12e0e28..bcb493f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -68,19 +68,26 @@ public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) thro @Override public Tuple2 getMemoryAndCores() throws Exception { SparkConf sparkConf = hiveSparkClient.getSparkConf(); - int cores = sparkConf.getInt("spark.executor.cores", 1); - double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2); + String masterURL = sparkConf.get("spark.master"); + int coresPerExecutor; + if (masterURL.startsWith("spark")) { + // TODO: need a way to get cores per executor in non-yarn mode + coresPerExecutor = 8; + } else { + coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1); + } + double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6); int executorMemoryInMB = Utils.memoryStringToMb( - sparkConf.get("spark.executor.memory", "512m")); + sparkConf.get("spark.executor.memory", "512m")); long memoryPerTaskInBytes = - (long) (executorMemoryInMB * memoryFraction * 1024 * 1024 / cores); + (long) (executorMemoryInMB * memoryFraction * 1024 * 1024 / coresPerExecutor); int executors = hiveSparkClient.getExecutorCount(); - int totalCores = executors * cores; + int totalCores = executors * coresPerExecutor; LOG.info("Spark cluster current has executors: " + executors - + ", cores per executor: " + cores + ", memory per executor: " - + executorMemoryInMB + "M, shuffle memoryFraction: " + memoryFraction); + + ", cores per executor: " + coresPerExecutor + ", memory per executor: " + + executorMemoryInMB + "M, memoryFraction: " + memoryFraction); return new Tuple2(Long.valueOf(memoryPerTaskInBytes), - Integer.valueOf(totalCores)); + Integer.valueOf(totalCores)); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index 613b5bc..e36b2ab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -130,7 +130,7 @@ public Object process(Node nd, Stack stack, } // Divide it by 2 so that we can have more reducers - long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2; + long bytesPerReducer = context.getConf().getLongVar(HiveConf.ConfVars.BYTESPERREDUCER) / 2; int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer, maxReducers, false); @@ -139,6 +139,9 @@ public Object process(Node nd, Stack stack, if (numReducers < cores) { numReducers = cores; } + if (numReducers > maxReducers) { + numReducers = maxReducers; + } LOG.info("Set parallelism parameters: cores = " + cores + ", numReducers = " + numReducers + ", bytesPerReducer = " + bytesPerReducer + ", numberOfBytes = " + numberOfBytes); LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers + " (calculated)");