diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java index 6b3cb80..9aea69d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java @@ -45,54 +45,21 @@ import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Properties; public class SparkClient implements Serializable { private static final long serialVersionUID = 1L; protected static transient final Log LOG = LogFactory.getLog(SparkClient.class); - private static String masterUrl = "local"; + private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf"; + private static final String SPARK_DEFAULT_MASTER = "local"; + private static final String SAPRK_DEFAULT_APP_NAME = "Hive on Spark"; - private static String appName = "Hive-Spark"; - - private static String sparkHome = "/home/xzhang/apache/spark"; - - private static int reducerCount = 1; - - private static String execMem = "1g"; - private static String execJvmOpts = ""; - - static { - String envSparkHome = System.getenv("SPARK_HOME"); - if (envSparkHome != null) { - sparkHome = envSparkHome; - } - - String envMaster = System.getenv("MASTER"); - if (envMaster != null) { - masterUrl = envMaster; - } - - String reducers = System.getenv("REDUCERS"); - if (reducers != null) { - reducerCount = Integer.valueOf(reducers); - } - - String mem = System.getenv("spark_executor_memory"); - if (mem != null) { - execMem = mem; - } - - String jopts = System.getenv("spark_executor_extraJavaOptions"); - if (jopts != null) { - execJvmOpts = jopts; - } - - } - private static SparkClient client = new SparkClient(); public static SparkClient getInstance() { @@ -106,12 +73,41 @@ public static SparkClient getInstance() { private List localFiles = new ArrayList(); private SparkClient() { - SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(masterUrl).setSparkHome(sparkHome); - sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - sparkConf.set("spark.default.parallelism", "1"); - sparkConf.set("spark.executor.memory", execMem); - sparkConf.set("spark.executor.extraJavaOptions", execJvmOpts); - sc = new JavaSparkContext(sparkConf); + sc = new JavaSparkContext(initiateSparkConf()); + } + + private SparkConf initiateSparkConf() { + SparkConf sparkConf = new SparkConf(); + sparkConf.set("spark.master", SPARK_DEFAULT_MASTER); + sparkConf.set("spark.app.name", SAPRK_DEFAULT_APP_NAME); + InputStream inputStream = null; + try { + inputStream = this.getClass().getClassLoader().getResourceAsStream(SPARK_DEFAULT_CONF_FILE); + if (inputStream != null) { + LOG.info("loading spark properties from:" + SPARK_DEFAULT_CONF_FILE); + Properties properties = new Properties(); + properties.load(inputStream); + for (String propertyName : properties.stringPropertyNames()) { + if (propertyName.startsWith("spark")) { + String value = properties.getProperty(propertyName); + sparkConf.set(propertyName, value); + LOG.info(String.format("load spark configuration from %s (%s -> %s).", + SPARK_DEFAULT_CONF_FILE, propertyName, value)); + } + } + } + } catch (IOException e) { + LOG.info("Failed to open spark configuration file:" + SPARK_DEFAULT_CONF_FILE, e); + } finally { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException e) { + LOG.debug("Failed to close inputstream.", e); + } + } + } + return sparkConf; } public int execute(DriverContext driverContext, SparkWork sparkWork) { @@ -191,7 +187,7 @@ public int execute(DriverContext driverContext, SparkWork sparkWork) { } } } else { - JavaPairRDD rdd3 = rdd2.partitionBy(new HashPartitioner(reducerCount/*redWork.getNumReduceTasks()*/)); // Two partitions. + JavaPairRDD rdd3 = rdd2.partitionBy(new HashPartitioner(1/*redWork.getNumReduceTasks()*/)); // Two partitions. HiveReduceFunction rf = new HiveReduceFunction(confBytes); JavaPairRDD rdd4 = rdd3.mapPartitionsToPair(rf); rdd4.foreach(HiveVoidFunction.getInstance());