diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java index 6b3cb80..94db145 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java @@ -21,6 +21,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -45,57 +46,25 @@ import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; +import java.util.*; public class SparkClient implements Serializable { private static final long serialVersionUID = 1L; protected static transient final Log LOG = LogFactory.getLog(SparkClient.class); - private static String masterUrl = "local"; + private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf"; + private static final String SPARK_DEFAULT_MASTER = "local"; + private static final String SAPRK_DEFAULT_APP_NAME = "Hive on Spark"; - private static String appName = "Hive-Spark"; - - private static String sparkHome = "/home/xzhang/apache/spark"; - - private static int reducerCount = 1; + private static SparkClient client; - private static String execMem = "1g"; - private static String execJvmOpts = ""; - - static { - String envSparkHome = System.getenv("SPARK_HOME"); - if (envSparkHome != null) { - sparkHome = envSparkHome; - } - - String envMaster = System.getenv("MASTER"); - if (envMaster != null) { - masterUrl = envMaster; - } - - String reducers = System.getenv("REDUCERS"); - if (reducers != null) { - reducerCount = Integer.valueOf(reducers); - } - - String mem = System.getenv("spark_executor_memory"); - if (mem != null) { - execMem = mem; - } - - String jopts = System.getenv("spark_executor_extraJavaOptions"); - if (jopts != null) { - execJvmOpts = jopts; + public static synchronized SparkClient getInstance(Configuration hiveConf) { + if (client == null) { + client = new SparkClient(hiveConf); } - - } - - private static SparkClient client = new SparkClient(); - - public static SparkClient getInstance() { return client; } @@ -105,13 +74,60 @@ public static SparkClient getInstance() { private List localFiles = new ArrayList(); - private SparkClient() { - SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(masterUrl).setSparkHome(sparkHome); - sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - sparkConf.set("spark.default.parallelism", "1"); - sparkConf.set("spark.executor.memory", execMem); - sparkConf.set("spark.executor.extraJavaOptions", execJvmOpts); - sc = new JavaSparkContext(sparkConf); + private SparkClient(Configuration hiveConf) { + sc = new JavaSparkContext(initiateSparkConf(hiveConf)); + } + + private SparkConf initiateSparkConf(Configuration hiveConf) { + SparkConf sparkConf = new SparkConf(); + + // set default spark configurations. + sparkConf.set("spark.master", SPARK_DEFAULT_MASTER); + sparkConf.set("spark.app.name", SAPRK_DEFAULT_APP_NAME); + + // load properties from spark-defaults.conf. + InputStream inputStream = null; + try { + inputStream = this.getClass().getClassLoader().getResourceAsStream(SPARK_DEFAULT_CONF_FILE); + if (inputStream != null) { + LOG.info("loading spark properties from:" + SPARK_DEFAULT_CONF_FILE); + Properties properties = new Properties(); + properties.load(inputStream); + for (String propertyName : properties.stringPropertyNames()) { + if (propertyName.startsWith("spark")) { + String value = properties.getProperty(propertyName); + sparkConf.set(propertyName, properties.getProperty(propertyName)); + LOG.info(String.format("load spark configuration from %s (%s -> %s).", + SPARK_DEFAULT_CONF_FILE, propertyName, value)); + } + } + } + } catch (IOException e) { + LOG.info("Failed to open spark configuration file:" + SPARK_DEFAULT_CONF_FILE, e); + } finally { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException e) { + LOG.debug("Failed to close inputstream.", e); + } + } + } + + // load properties from hive configurations. + Iterator> iterator = hiveConf.iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + String propertyName = entry.getKey(); + if (propertyName.startsWith("spark")) { + String value = entry.getValue(); + sparkConf.set(propertyName, value); + LOG.info(String.format("load spark configuration from hive configuration (%s -> %s).", + propertyName, value)); + } + } + + return sparkConf; } public int execute(DriverContext driverContext, SparkWork sparkWork) { @@ -191,7 +207,7 @@ public int execute(DriverContext driverContext, SparkWork sparkWork) { } } } else { - JavaPairRDD rdd3 = rdd2.partitionBy(new HashPartitioner(reducerCount/*redWork.getNumReduceTasks()*/)); // Two partitions. + JavaPairRDD rdd3 = rdd2.partitionBy(new HashPartitioner(1/*redWork.getNumReduceTasks()*/)); // Two partitions. HiveReduceFunction rf = new HiveReduceFunction(confBytes); JavaPairRDD rdd4 = rdd3.mapPartitionsToPair(rf); rdd4.foreach(HiveVoidFunction.getInstance()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 8a05e56..c358436 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -28,7 +28,7 @@ @Override public int execute(DriverContext driverContext) { - SparkClient client = SparkClient.getInstance(); + SparkClient client = SparkClient.getInstance(driverContext.getCtx().getConf()); return client.execute(driverContext, getWork()); }