diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java index a456d6c..b6dd8cb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java @@ -42,4 +42,9 @@ * Get the count of executors */ public int getExecutorCount() throws Exception; + + /** + * Get default parallelism. For standalone mode, this can be used to get total number of cores. + */ + public int getDefaultParallelism() throws Exception; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 78c1da8..d0abec8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -62,7 +62,6 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) sparkConf.put("spark.app.name", SPARK_DEFAULT_APP_NAME); sparkConf.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - sparkConf.put("spark.default.parallelism", "1"); // load properties from spark-defaults.conf. InputStream inputStream = null; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index 5cfdcec..467147c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -95,6 +95,11 @@ public int getExecutorCount() { } @Override + public int getDefaultParallelism() throws Exception { + return sc.sc().defaultParallelism(); + } + + @Override public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception { Context ctx = driverContext.getCtx(); HiveConf hiveConf = (HiveConf) ctx.getConf(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 87b47a6..59003c4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -95,6 +95,14 @@ public int getExecutorCount() throws Exception { } @Override + public int getDefaultParallelism() throws Exception { + long timeout = hiveConf.getTimeVar( + HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS); + Future handler = remoteClient.getDefaultParallelism(); + return handler.get(timeout, TimeUnit.SECONDS); + } + + @Override public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception { final Context ctx = driverContext.getCtx(); final HiveConf hiveConf = (HiveConf) ctx.getConf(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 12e0e28..343f16f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -68,19 +68,34 @@ public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) thro @Override public Tuple2 getMemoryAndCores() throws Exception { SparkConf sparkConf = hiveSparkClient.getSparkConf(); - int cores = sparkConf.getInt("spark.executor.cores", 1); - double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2); + int numExecutors = hiveSparkClient.getExecutorCount(); + // at start-up, we may be unable to get number of executors + if (numExecutors <= 0) { + return new Tuple2(-1L, -1); + } int executorMemoryInMB = Utils.memoryStringToMb( - sparkConf.get("spark.executor.memory", "512m")); - long memoryPerTaskInBytes = - (long) (executorMemoryInMB * memoryFraction * 1024 * 1024 / cores); - int executors = hiveSparkClient.getExecutorCount(); - int totalCores = executors * cores; - LOG.info("Spark cluster current has executors: " + executors - + ", cores per executor: " + cores + ", memory per executor: " - + executorMemoryInMB + "M, shuffle memoryFraction: " + memoryFraction); + sparkConf.get("spark.executor.memory", "512m")); + double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6); + long totalMemory = (long) (numExecutors * executorMemoryInMB * memoryFraction * 1024 * 1024); + int totalCores; + String masterURL = sparkConf.get("spark.master"); + if (masterURL.startsWith("spark")) { + totalCores = sparkConf.contains("spark.default.parallelism") ? + sparkConf.getInt("spark.default.parallelism", 8) : + hiveSparkClient.getDefaultParallelism(); + totalCores = Math.max(totalCores, numExecutors); + } else { + int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1); + totalCores = numExecutors * coresPerExecutor; + } + totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1); + + long memoryPerTaskInBytes = totalMemory / totalCores; + LOG.info("Spark cluster current has executors: " + numExecutors + + ", total cores: " + totalCores + ", memory per executor: " + + executorMemoryInMB + "M, memoryFraction: " + memoryFraction); return new Tuple2(Long.valueOf(memoryPerTaskInBytes), - Integer.valueOf(totalCores)); + Integer.valueOf(totalCores)); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index 613b5bc..b247fcf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -117,7 +117,7 @@ public Object process(Node nd, Stack stack, context.getConf(), sparkSessionManager); sparkMemoryAndCores = sparkSession.getMemoryAndCores(); } catch (Exception e) { - throw new SemanticException("Failed to get spark memory/core info: " + e, e); + LOG.warn("Failed to get spark memory/core info", e); } finally { if (sparkSession != null && sparkSessionManager != null) { try { @@ -130,18 +130,24 @@ public Object process(Node nd, Stack stack, } // Divide it by 2 so that we can have more reducers - long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2; + long bytesPerReducer = context.getConf().getLongVar(HiveConf.ConfVars.BYTESPERREDUCER) / 2; int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer, - maxReducers, false); + maxReducers, false); + + if (sparkMemoryAndCores != null && + sparkMemoryAndCores._1() > 0 && sparkMemoryAndCores._2() > 0) { + // warn the user if bytes per reducer is much larger than memory per task + if ((double) sparkMemoryAndCores._1() / bytesPerReducer < 0.5) { + LOG.warn("Average load of a reducer is much larger than its available memory. " + + "Consider decreasing hive.exec.reducers.bytes.per.reducer"); + } - // If there are more cores, use the number of cores - int cores = sparkMemoryAndCores._2.intValue(); - if (numReducers < cores) { - numReducers = cores; + // If there are more cores, use the number of cores + numReducers = Math.max(numReducers, sparkMemoryAndCores._2()); } - LOG.info("Set parallelism parameters: cores = " + cores + ", numReducers = " + numReducers + - ", bytesPerReducer = " + bytesPerReducer + ", numberOfBytes = " + numberOfBytes); - LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers + " (calculated)"); + numReducers = Math.min(numReducers, maxReducers); + LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers + + " (calculated)"); desc.setNumReducers(numReducers); } } else { diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java index 9757ced..57bc2be 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java @@ -72,4 +72,9 @@ * Get the count of executors */ Future getExecutorCount(); + + /** + * Get default parallelism. For standalone mode, this can be used to get total number of cores. + */ + Future getDefaultParallelism(); } diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 54eacfd..aa08029 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -155,6 +155,11 @@ public void stop() { return submit(new GetExecutorCountJob()); } + @Override + public Future getDefaultParallelism() { + return submit(new GetDefaultParallelismJob()); + } + void cancel(String jobId) { protocol.cancel(jobId); } @@ -489,10 +494,20 @@ public Serializable call(JobContext jc) throws Exception { @Override public Integer call(JobContext jc) throws Exception { - int count = jc.sc().sc().getExecutorMemoryStatus().size(); + // minus 1 here otherwise driver is also counted as an executor + int count = jc.sc().sc().getExecutorMemoryStatus().size() - 1; return Integer.valueOf(count); } } + private static class GetDefaultParallelismJob implements Job { + private static final long serialVersionUID = 1L; + + @Override + public Integer call(JobContext jc) throws Exception { + return jc.sc().sc().defaultParallelism(); + } + } + }