Index: src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java =================================================================== --- src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (revision 1614847) +++ src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (working copy) @@ -18,28 +18,19 @@ package org.apache.hadoop.hive.ql.exec.spark; -import org.apache.commons.lang.StringUtils; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.SparkWork; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.mapred.JobConf; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; -import java.io.IOException; -import java.io.InputStream; -import java.io.Serializable; -import java.util.*; - public class SparkClient implements Serializable { private static final long serialVersionUID = 1L; protected static transient final Log LOG = LogFactory.getLog(SparkClient.class); @@ -59,14 +50,14 @@ private JavaSparkContext sc; - private List localJars = new ArrayList(); - - private List localFiles = new ArrayList(); - private SparkClient(Configuration hiveConf) { sc = new JavaSparkContext(initiateSparkConf(hiveConf)); } + public JavaSparkContext getSparkContext() { + return sc; + } + private SparkConf initiateSparkConf(Configuration hiveConf) { SparkConf sparkConf = new SparkConf(); @@ -120,104 +111,4 @@ return sparkConf; } - public int execute(DriverContext driverContext, SparkWork sparkWork) { - Context ctx = driverContext.getCtx(); - HiveConf hiveConf = (HiveConf)ctx.getConf(); - refreshLocalResources(sparkWork, hiveConf); - JobConf jobConf = new JobConf(hiveConf); - - // Create temporary scratch dir - Path emptyScratchDir; - try { - emptyScratchDir = ctx.getMRTmpPath(); - FileSystem fs = emptyScratchDir.getFileSystem(jobConf); - fs.mkdirs(emptyScratchDir); - } catch (IOException e) { - e.printStackTrace(); - System.err.println("Error launching map-reduce job" + "\n" - + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return 5; - } - - // Generate Spark plan - SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir); - SparkPlan plan; - try { - plan = gen.generate(sparkWork); - } catch (Exception e) { - e.printStackTrace(); - return 2; - } - - // Execute generated plan. - // TODO: we should catch any exception and return more meaningful error code. - plan.execute(); - return 0; - } - - private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { - // add hive-exec jar - String hiveJar = conf.getJar(); - if (!localJars.contains(hiveJar)) { - localJars.add(hiveJar); - sc.addJar(hiveJar); - } - // add aux jars - String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS); - if (StringUtils.isNotEmpty(auxJars) && StringUtils.isNotBlank(auxJars)) { - addResources(auxJars, localJars); - } - - // add added jars - String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR); - if (StringUtils.isNotEmpty(addedJars) && StringUtils.isNotBlank(addedJars)) { - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars); - addResources(addedJars, localJars); - } - - // add plugin module jars on demand - final String MR_JAR_PROPERTY = "tmpjars"; - // jobConf will hold all the configuration for hadoop, tez, and hive - JobConf jobConf = new JobConf(conf); - jobConf.setStrings(MR_JAR_PROPERTY, new String[0]); - - for (BaseWork work : sparkWork.getAllWork()) { - work.configureJobConf(jobConf); - } - - String[] newTmpJars = jobConf.getStrings(MR_JAR_PROPERTY); - if (newTmpJars != null && newTmpJars.length > 0) { - for (String tmpJar : newTmpJars) { - if (StringUtils.isNotEmpty(tmpJar) && StringUtils.isNotBlank(tmpJar) - && !localJars.contains(tmpJar)) { - localJars.add(tmpJar); - sc.addJar(tmpJar); - } - } - } - - //add added files - String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); - if (StringUtils.isNotEmpty(addedFiles) && StringUtils.isNotBlank(addedFiles)) { - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles); - addResources(addedFiles, localFiles); - } - - // add added archives - String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE); - if (StringUtils.isNotEmpty(addedArchives) && StringUtils.isNotBlank(addedArchives)) { - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives); - addResources(addedArchives, localFiles); - } - } - - private void addResources(String addedFiles, List localCache) { - for (String addedFile : addedFiles.split(",")) { - if (StringUtils.isNotEmpty(addedFile) && StringUtils.isNotBlank(addedFile) - && !localCache.contains(addedFile)) { - localCache.add(addedFile); - sc.addFile(addedFile); - } - } - } } Index: src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java =================================================================== --- src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (revision 1614847) +++ src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (working copy) @@ -18,8 +18,16 @@ package org.apache.hadoop.hive.ql.exec.spark; +import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; @@ -27,26 +35,65 @@ import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.mapred.JobConf; +import org.apache.spark.api.java.JavaSparkContext; +/** +* +* SparkTask handles the execution of SparkWork. It executes the SparkWork using SparkClient. +* +*/ public class SparkTask extends Task { + private static final long serialVersionUID = 1L; + protected static transient final Log LOG = LogFactory.getLog(SparkTask.class); + + private List localJars = new ArrayList(); + private List localFiles = new ArrayList(); @Override public int execute(DriverContext driverContext) { int rc = 1; - SparkClient client = null; + Context ctx = null; + JavaSparkContext sc = null; try { - client = SparkClient.getInstance(driverContext.getCtx().getConf()); - rc = client.execute(driverContext, getWork()); + ctx = driverContext.getCtx(); + HiveConf hiveConf = (HiveConf) ctx.getConf(); + JobConf jobConf = new JobConf(hiveConf); + sc = SparkClient.getInstance(hiveConf).getSparkContext(); + SparkWork sparkWork = getWork(); + refreshLocalResources(sc, sparkWork, hiveConf); + + // Create temporary scratch dir + Path emptyScratchDir = ctx.getMRTmpPath(); + FileSystem fs = emptyScratchDir.getFileSystem(jobConf); + fs.mkdirs(emptyScratchDir); + + // Generate spark plan and execute + submit(ctx, sc, jobConf, sparkWork, emptyScratchDir); + + // TODO: need to add job monitoring + return 0; + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Failed to execute spark job.", e); + // rc will be 1 at this point indicating failure. } finally { - if (client != null) { - rc = close(rc); + if (sc != null) { + rc = close(rc); } } return rc; } + private void submit(Context ctx, JavaSparkContext sc, JobConf jobConf, SparkWork sparkWork, + Path emptyScratchDir) throws Exception { + SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir); + SparkPlan plan= gen.generate(sparkWork); + plan.execute(); + } + /** * close will move the temp files into the right place for the fetch * task. If the job has failed it will clean up the files. @@ -65,7 +112,7 @@ rc = 3; String mesg = "Job Commit failed with exception '" + Utilities.getNameMessage(e) + "'"; - console.printError(mesg, "\n" + StringUtils.stringifyException(e)); + console.printError(mesg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); } } return rc; @@ -85,5 +132,70 @@ public String getName() { return "SPARK"; } + + private void refreshLocalResources(JavaSparkContext sc, SparkWork sparkWork, HiveConf conf) { + // add hive-exec jar + String hiveJar = conf.getJar(); + if (!localJars.contains(hiveJar)) { + localJars.add(hiveJar); + sc.addJar(hiveJar); + } + // add aux jars + String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS); + if (StringUtils.isNotEmpty(auxJars) && StringUtils.isNotBlank(auxJars)) { + addResources(sc, auxJars, localJars); + } + // add added jars + String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR); + if (StringUtils.isNotEmpty(addedJars) && StringUtils.isNotBlank(addedJars)) { + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars); + addResources(sc, addedJars, localJars); + } + // add plugin module jars on demand + final String MR_JAR_PROPERTY = "tmpjars"; + // jobConf will hold all the configuration for hadoop, tez, and hive + JobConf jobConf = new JobConf(conf); + jobConf.setStrings(MR_JAR_PROPERTY, new String[0]); + + for (BaseWork work : sparkWork.getAllWork()) { + work.configureJobConf(jobConf); + } + + String[] newTmpJars = jobConf.getStrings(MR_JAR_PROPERTY); + if (newTmpJars != null && newTmpJars.length > 0) { + for (String tmpJar : newTmpJars) { + if (StringUtils.isNotEmpty(tmpJar) && StringUtils.isNotBlank(tmpJar) + && !localJars.contains(tmpJar)) { + localJars.add(tmpJar); + sc.addJar(tmpJar); + } + } + } + + //add added files + String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); + if (StringUtils.isNotEmpty(addedFiles) && StringUtils.isNotBlank(addedFiles)) { + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles); + addResources(sc, addedFiles, localFiles); + } + + // add added archives + String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE); + if (StringUtils.isNotEmpty(addedArchives) && StringUtils.isNotBlank(addedArchives)) { + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives); + addResources(sc, addedArchives, localFiles); + } + } + + private void addResources(JavaSparkContext sc, String addedFiles, List localCache) { + for (String addedFile : addedFiles.split(",")) { + if (StringUtils.isNotEmpty(addedFile) && StringUtils.isNotBlank(addedFile) + && !localCache.contains(addedFile)) { + localCache.add(addedFile); + sc.addFile(addedFile); + } + } + } + }