Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (revision 1614618) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (working copy) @@ -18,29 +18,19 @@ package org.apache.hadoop.hive.ql.exec.spark; -import org.apache.commons.lang.StringUtils; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.ReduceWork; -import org.apache.hadoop.hive.ql.plan.SparkWork; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.mapred.JobConf; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; -import java.io.IOException; -import java.io.InputStream; -import java.io.Serializable; -import java.util.*; - public class SparkClient implements Serializable { private static final long serialVersionUID = 1L; protected static transient final Log LOG = LogFactory.getLog(SparkClient.class); @@ -60,10 +50,6 @@ private JavaSparkContext sc; - private List localJars = new ArrayList(); - - private List localFiles = new ArrayList(); - private SparkClient(Configuration hiveConf) { sc = new JavaSparkContext(initiateSparkConf(hiveConf)); } @@ -121,7 +107,11 @@ return sparkConf; } - public int execute(DriverContext driverContext, SparkWork sparkWork) { + public JavaSparkContext getSparkContext(){ + return sc; + } + + /* public int execute(DriverContext driverContext, SparkWork sparkWork) { HiveConf hiveConf = (HiveConf)driverContext.getCtx().getConf(); refreshLocalResources(sparkWork, hiveConf); @@ -165,7 +155,7 @@ Utilities.createTmpDirs(jobConf, redWork); } catch (IOException e1) { e1.printStackTrace(); - } + }*/ /* try { Path planPath = new Path(jobConf.getWorkingDirectory(), "plan.xml"); @@ -207,7 +197,7 @@ } } */ - SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir); + /* SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir); SparkPlan plan; try { plan = gen.generate(sparkWork); @@ -287,5 +277,5 @@ sc.addFile(addedFile); } } - } + }*/ } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (revision 1614618) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (working copy) @@ -18,31 +18,113 @@ package org.apache.hadoop.hive.ql.exec.spark; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.mapred.JobConf; +import org.apache.commons.lang.StringUtils; +import org.apache.spark.api.java.JavaSparkContext; public class SparkTask extends Task { private static final long serialVersionUID = 1L; + + protected static transient final Log LOG = LogFactory.getLog(SparkTask.class); + + private List localJars = new ArrayList(); + private List localFiles = new ArrayList(); @Override public int execute(DriverContext driverContext) { int rc = 1; - SparkClient client = null; + Context ctx = null; + JavaSparkContext sc = null; + boolean cleanContext = false; try { - client = SparkClient.getInstance(driverContext.getCtx().getConf()); - rc = client.execute(driverContext, getWork()); + HiveConf hiveConf = (HiveConf) (driverContext.getCtx().getConf()); + JobConf jobConf = new JobConf(hiveConf); + sc = SparkClient.getInstance(hiveConf).getSparkContext(); + + SparkWork sparkWork = getWork(); + MapWork mapWork = sparkWork.getMapWork(); + ReduceWork redWork = sparkWork.getReduceWork(); + + refreshLocalResources(sc, sparkWork, hiveConf); + + ctx = driverContext.getCtx(); + Path emptyScratchDir; + if (ctx == null) { + ctx = new Context(jobConf); + cleanContext = true; + } + + emptyScratchDir = ctx.getMRTmpPath(); + FileSystem fs = emptyScratchDir.getFileSystem(jobConf); + fs.mkdirs(emptyScratchDir); + + List inputPaths; + try { + inputPaths = Utilities.getInputPaths(jobConf, mapWork, emptyScratchDir, ctx, false); + } catch (Exception e2) { + e2.printStackTrace(); + return -1; + } + + Utilities.setInputPaths(jobConf, inputPaths); + Utilities.setMapWork(jobConf, mapWork, emptyScratchDir, true); + if (redWork != null) + Utilities.setReduceWork(jobConf, redWork, emptyScratchDir, true); + + try { + Utilities.createTmpDirs(jobConf, mapWork); + Utilities.createTmpDirs(jobConf, redWork); + } catch (IOException e1) { + e1.printStackTrace(); + } + + SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir); + SparkPlan plan; + try { + plan = gen.generate(sparkWork); + } catch (Exception e) { + e.printStackTrace(); + return 2; + } + plan.execute(); + // TODO: need to add job monitoring + return 0; + } catch (Exception e) { + LOG.error("Failed to execute spark job.", e); + // rc will be 1 at this point indicating failure. } finally { - if (client != null) { - rc = close(rc); + Utilities.clearWork(conf); + if (cleanContext) { + try { + ctx.clear(); + } catch (Exception e) { + /*best effort*/ + LOG.warn("Failed to clean up after spark job"); + } } + if (sc != null) { + rc = close(rc); + } } return rc; } @@ -65,7 +147,7 @@ rc = 3; String mesg = "Job Commit failed with exception '" + Utilities.getNameMessage(e) + "'"; - console.printError(mesg, "\n" + StringUtils.stringifyException(e)); + console.printError(mesg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); } } return rc; @@ -85,5 +167,75 @@ public String getName() { return "SPARK"; } + + private void refreshLocalResources(JavaSparkContext sc, SparkWork sparkWork, HiveConf conf) { + + // add hive-exec jar + String hiveJar = conf.getJar(); + if (!localJars.contains(hiveJar)) { + localJars.add(hiveJar); + sc.addJar(hiveJar); + } + // add aux jars + String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS); + if (StringUtils.isNotEmpty(auxJars) && StringUtils.isNotBlank(auxJars)) { + addResources(sc, auxJars, localJars); + } + // add added jars + String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR); + if (StringUtils.isNotEmpty(addedJars) && StringUtils.isNotBlank(addedJars)) { + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars); + addResources(sc, addedJars, localJars); + } + // add plugin module jars on demand + final String MR_JAR_PROPERTY = "tmpjars"; + // jobConf will hold all the configuration for hadoop, tez, and hive + JobConf jobConf = new JobConf(conf); + jobConf.setStrings(MR_JAR_PROPERTY, new String[0]); + // TODO update after SparkCompiler finished. + // for (BaseWork work : sparkWork.getAllWork()) { + // work.configureJobConf(jobConf); + // } + sparkWork.getMapWork().configureJobConf(jobConf); + ReduceWork redWork = sparkWork.getReduceWork(); + if (redWork != null) { + redWork.configureJobConf(jobConf); + } + String[] newTmpJars = jobConf.getStrings(MR_JAR_PROPERTY); + if (newTmpJars != null && newTmpJars.length > 0) { + for (String tmpJar : newTmpJars) { + if (StringUtils.isNotEmpty(tmpJar) && StringUtils.isNotBlank(tmpJar) + && !localJars.contains(tmpJar)) { + localJars.add(tmpJar); + sc.addJar(tmpJar); + } + } + } + + //add added files + String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); + if (StringUtils.isNotEmpty(addedFiles) && StringUtils.isNotBlank(addedFiles)) { + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles); + addResources(sc, addedFiles, localFiles); + } + + // add added archives + String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE); + if (StringUtils.isNotEmpty(addedArchives) && StringUtils.isNotBlank(addedArchives)) { + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives); + addResources(sc, addedArchives, localFiles); + } + } + + private void addResources(JavaSparkContext sc, String addedFiles, List localCache) { + for (String addedFile : addedFiles.split(",")) { + if (StringUtils.isNotEmpty(addedFile) && StringUtils.isNotBlank(addedFile) + && !localCache.contains(addedFile)) { + localCache.add(addedFile); + sc.addFile(addedFile); + } + } + } + }