From f19187e5e4ee213d9b5158614c46a8ed762fecf8 Mon Sep 17 00:00:00 2001 From: vkorukanti Date: Thu, 14 Aug 2014 13:55:19 -0700 Subject: [PATCH] HIVE-7746: Cleanup SparkClient and make refreshLocalResources method synchronized. --- .../hadoop/hive/ql/exec/spark/SparkClient.java | 84 +++++++++------------- 1 file changed, 35 insertions(+), 49 deletions(-) diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java index bfb34ec..991ba5c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java @@ -42,9 +42,12 @@ public class SparkClient implements Serializable { private static final long serialVersionUID = 1L; + + private static final String MR_JAR_PROPERTY = "tmpjars"; protected static transient final Log LOG = LogFactory .getLog(SparkClient.class); + private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf"; private static final String SPARK_DEFAULT_MASTER = "local"; private static final String SAPRK_DEFAULT_APP_NAME = "Hive on Spark"; @@ -164,72 +167,52 @@ public int execute(DriverContext driverContext, SparkWork sparkWork) { return 0; } - private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { + /** + * At this point single SparkContext is used by more than one thread, so make this + * method synchronized. + * + * TODO: This method can't remove a jar/resource from SparkContext. Looks like this is an + * issue we have to live with until multiple SparkContexts are supported in a single JVM. + */ + private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { // add hive-exec jar - String hiveJar = conf.getJar(); - if (!localJars.contains(hiveJar)) { - localJars.add(hiveJar); - sc.addJar(hiveJar); - } + addJars(conf.getJar()); + // add aux jars - String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS); - if (StringUtils.isNotEmpty(auxJars) && StringUtils.isNotBlank(auxJars)) { - addJars(auxJars); - } + addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS)); // add added jars - String addedJars = Utilities.getResourceFiles(conf, - SessionState.ResourceType.JAR); - if (StringUtils.isNotEmpty(addedJars) && StringUtils.isNotBlank(addedJars)) { - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars); - addJars(addedJars); - } + String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars); + addJars(addedJars); // add plugin module jars on demand - final String MR_JAR_PROPERTY = "tmpjars"; // jobConf will hold all the configuration for hadoop, tez, and hive JobConf jobConf = new JobConf(conf); - jobConf.setStrings(MR_JAR_PROPERTY, new String[0]); - + jobConf.set(MR_JAR_PROPERTY, ""); for (BaseWork work : sparkWork.getAllWork()) { work.configureJobConf(jobConf); } - - String[] newTmpJars = jobConf.getStrings(MR_JAR_PROPERTY); - if (newTmpJars != null && newTmpJars.length > 0) { - for (String tmpJar : newTmpJars) { - if (StringUtils.isNotEmpty(tmpJar) && StringUtils.isNotBlank(tmpJar) - && !localJars.contains(tmpJar)) { - localJars.add(tmpJar); - sc.addJar(tmpJar); - } - } - } + addJars(conf.get(MR_JAR_PROPERTY)); // add added files - String addedFiles = Utilities.getResourceFiles(conf, - SessionState.ResourceType.FILE); - if (StringUtils.isNotEmpty(addedFiles) - && StringUtils.isNotBlank(addedFiles)) { - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles); - addResources(addedFiles); - } + String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles); + addResources(addedFiles); // add added archives - String addedArchives = Utilities.getResourceFiles(conf, - SessionState.ResourceType.ARCHIVE); - if (StringUtils.isNotEmpty(addedArchives) - && StringUtils.isNotBlank(addedArchives)) { - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives); - addResources(addedArchives); - } + String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives); + addResources(addedArchives); } private void addResources(String addedFiles) { + if (StringUtils.isBlank(addedFiles)) { + return; + } + for (String addedFile : addedFiles.split(",")) { - if (StringUtils.isNotEmpty(addedFile) - && StringUtils.isNotBlank(addedFile) - && !localFiles.contains(addedFile)) { + if (StringUtils.isNotBlank(addedFile) && !localFiles.contains(addedFile)) { localFiles.add(addedFile); sc.addFile(addedFile); } @@ -237,9 +220,12 @@ private void addResources(String addedFiles) { } private void addJars(String addedJars) { + if (StringUtils.isBlank(addedJars)) { + return; + } + for (String addedJar : addedJars.split(",")) { - if (StringUtils.isNotEmpty(addedJar) && StringUtils.isNotBlank(addedJar) - && !localJars.contains(addedJar)) { + if (StringUtils.isNotBlank(addedJar) && !localJars.contains(addedJar)) { localJars.add(addedJar); sc.addJar(addedJar); } -- 1.8.5.2 (Apple Git-48)