diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java index ee6f7d7..b3d55ea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java @@ -18,16 +18,12 @@ package org.apache.hadoop.hive.ql.exec.spark; -import java.io.IOException; -import java.io.OutputStream; -import java.io.Serializable; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Operator; @@ -39,6 +35,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; @@ -47,8 +44,15 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + public class SparkClient implements Serializable { private static final long serialVersionUID = 1L; + protected static transient final Log LOG = LogFactory.getLog(SparkClient.class); private static String masterUrl = "local"; @@ -97,6 +101,10 @@ public static SparkClient getInstance() { private JavaSparkContext sc; + private List localJars = new ArrayList(); + + private List localFiles = new ArrayList(); + private SparkClient() { SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(masterUrl).setSparkHome(sparkHome); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); @@ -104,16 +112,18 @@ private SparkClient() { sparkConf.set("spark.executor.memory", execMem); sparkConf.set("spark.executor.extraJavaOptions", execJvmOpts); sc = new JavaSparkContext(sparkConf); - addJars(); } public int execute(DriverContext driverContext, SparkWork sparkWork) { int rc = 1; // System.out.println("classpath=\n"+System.getProperty("java.class.path") + "\n"); + + HiveConf hiveConf = (HiveConf)driverContext.getCtx().getConf(); + refreshLocalResources(sparkWork, hiveConf); + MapWork mapWork = sparkWork.getMapWork(); ReduceWork redWork = sparkWork.getReduceWork(); - - Configuration hiveConf = driverContext.getCtx().getConf(); + // TODO: need to combine spark conf and hive conf JobConf jobConf = new JobConf(hiveConf); @@ -204,25 +214,69 @@ private JavaPairRDD createRDD(JavaSparkContext sc, JobConf jobConf, MapWork mapW return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); } - private void addJars() { - ClassLoader cl = ClassLoader.getSystemClassLoader(); - - System.out.println("-----------------------------------------------------"); - URL[] urls = ((URLClassLoader)cl).getURLs(); + private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { + // add hive-exec jar + String hiveJar = conf.getJar(); + if (!localJars.contains(hiveJar)) { + localJars.add(hiveJar); + sc.addJar(hiveJar); + } + // add aux jars + String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS); + if (StringUtils.isNotEmpty(auxJars) && StringUtils.isNotBlank(auxJars)) { + addResources(auxJars, localJars); + } - for(URL url: urls){ - java.io.File file = new java.io.File(url.getFile()); - if (file.exists() && file.isFile()) { - if (file.getName().contains("guava")) { - System.out.println("** skipping guava jar **: " + url.getFile()); - } else { - System.out.println("adding jar: " + url.getFile()); - sc.addJar(url.getFile()); + // add added jars + String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR); + if (StringUtils.isNotEmpty(addedJars) && StringUtils.isNotBlank(addedJars)) { + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars); + addResources(addedJars, localJars); + } + // add plugin module jars on demand + final String MR_JAR_PROPERTY = "tmpjars"; + // jobConf will hold all the configuration for hadoop, tez, and hive + JobConf jobConf = new JobConf(conf); + jobConf.setStrings(MR_JAR_PROPERTY, new String[0]); + // TODO update after SparkCompiler finished. + // for (BaseWork work : sparkWork.getAllWork()) { + // work.configureJobConf(jobConf); + // } + sparkWork.getMapWork().configureJobConf(jobConf); + sparkWork.getReduceWork().configureJobConf(jobConf); + String[] newTmpJars = jobConf.getStrings(MR_JAR_PROPERTY); + if (newTmpJars != null && newTmpJars.length > 0) { + for (String tmpJar : newTmpJars) { + if (StringUtils.isNotEmpty(tmpJar) && StringUtils.isNotBlank(tmpJar) + && !localJars.contains(tmpJar)) { + localJars.add(tmpJar); + sc.addJar(tmpJar); } } } - System.out.println("---------------------------------------------- ------"); + //add added files + String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); + if (StringUtils.isNotEmpty(addedFiles) && StringUtils.isNotBlank(addedFiles)) { + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles); + addResources(addedFiles, localFiles); + } + + // add added archives + String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE); + if (StringUtils.isNotEmpty(addedArchives) && StringUtils.isNotBlank(addedArchives)) { + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives); + addResources(addedArchives, localFiles); + } } + private void addResources(String addedFiles, List localCache) { + for (String addedFile : addedFiles.split(",")) { + if (StringUtils.isNotEmpty(addedFile) && StringUtils.isNotBlank(addedFile) + && !localCache.contains(addedFile)) { + localCache.add(addedFile); + sc.addFile(addedFile); + } + } + } }