diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java index 82ec668..b4a5432 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java @@ -36,6 +36,8 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkEnv; import org.apache.spark.SparkFiles; import java.io.BufferedInputStream; @@ -330,6 +332,7 @@ public void process(Object row, int tag) throws HiveException { // initialize the user's process only when you receive the first row if (firstRow) { firstRow = false; + SparkConf sparkConf = null; try { String[] cmdArgs = splitArgs(conf.getScriptCmd()); @@ -342,6 +345,7 @@ public void process(Object row, int tag) throws HiveException { // In spark local mode, we need to search added files in root directory. if (HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + sparkConf = SparkEnv.get().conf(); finder.prependPathComponent(SparkFiles.getRootDirectory()); } File f = finder.getAbsolutePath(prog); @@ -372,6 +376,17 @@ public void process(Object row, int tag) throws HiveException { String idEnvVarVal = getOperatorId(); env.put(safeEnvVarName(idEnvVarName), idEnvVarVal); + // For spark, in non-local mode, any added dependencies are stored at + // SparkFiles::getRootDirectory, which is the executor's working directory. + // In local mode, we need to manually point the process's working directory to it, + // in order to make the dependencies accessible. + if (sparkConf != null) { + String master = sparkConf.get("spark.master"); + if (master.equals("local") || master.startsWith("local[")) { + pb.directory(new File(SparkFiles.getRootDirectory())); + } + } + scriptPid = pb.start(); // Runtime.getRuntime().exec(wrappedCmdArgs); DataOutputStream scriptOut = new DataOutputStream( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 7d43160..976361c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -273,11 +273,11 @@ public Serializable call(JobContext jc) throws Exception { // Add jar to current thread class loader dynamically, and add jar paths to JobConf as Spark // may need to load classes from this jar in other threads. - Set addedJars = jc.getAddedJars(); + Map addedJars = jc.getAddedJars(); if (addedJars != null && !addedJars.isEmpty()) { SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir()); KryoSerializer.setClassLoader(Thread.currentThread().getContextClassLoader()); - localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";")); + localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars.keySet(), ";")); } Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index cf2c3bc..0268469 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -22,8 +22,6 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; -import java.util.UUID; -import java.util.Collection; import com.google.common.base.Preconditions; import org.apache.commons.io.FilenameUtils; @@ -91,11 +89,11 @@ public static URI getURI(String path) throws URISyntaxException { */ public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException { Path localFile = new Path(source.getPath()); - // give the uploaded file a UUID - Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), - UUID.randomUUID() + "-" + getFileName(source)); + Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), getFileName(source)); FileSystem fileSystem = FileSystem.get(conf); - fileSystem.copyFromLocalFile(localFile, remoteFile); + // Overwrite if the remote file already exists. Whether the file can be added + // on executor is up to spark, i.e. spark.files.overwrite + fileSystem.copyFromLocalFile(false, true, localFile, remoteFile); Path fullPath = fileSystem.getFileStatus(remoteFile).getPath(); return fullPath.toUri(); } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java index af6332e..c9c975b 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java @@ -55,9 +55,9 @@ Map>> getMonitoredJobs(); /** - * Return all added jar path which added through AddJarJob. + * Return all added jar path and timestamp which added through AddJarJob. */ - Set getAddedJars(); + Map getAddedJars(); /** * Returns a local tmp dir specific to the context diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java index beed8a3..b73bcd7 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java @@ -18,12 +18,10 @@ package org.apache.hive.spark.client; import java.io.File; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import org.apache.hive.spark.counter.SparkCounters; @@ -35,14 +33,14 @@ private final JavaSparkContext sc; private final ThreadLocal monitorCb; private final Map>> monitoredJobs; - private final Set addedJars; + private final Map addedJars; private final File localTmpDir; public JobContextImpl(JavaSparkContext sc, File localTmpDir) { this.sc = sc; this.monitorCb = new ThreadLocal(); monitoredJobs = new ConcurrentHashMap>>(); - addedJars = Collections.newSetFromMap(new ConcurrentHashMap()); + addedJars = new ConcurrentHashMap<>(); this.localTmpDir = localTmpDir; } @@ -65,7 +63,7 @@ public JavaSparkContext sc() { } @Override - public Set getAddedJars() { + public Map getAddedJars() { return addedJars; } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 2546a46..43dd5da 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -612,7 +612,7 @@ public Serializable call(JobContext jc) throws Exception { jc.sc().addJar(path); // Following remote job may refer to classes in this jar, and the remote job would be executed // in a different thread, so we add this jar path to JobContext for further usage. - jc.getAddedJars().add(path); + jc.getAddedJars().put(path, System.currentTimeMillis()); return null; } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java index 589436d..bbbd97b 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java @@ -24,7 +24,8 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.List; -import java.util.Set; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -35,20 +36,21 @@ public class SparkClientUtilities { protected static final transient Log LOG = LogFactory.getLog(SparkClientUtilities.class); + private static final Map downloadedFiles = new ConcurrentHashMap<>(); /** * Add new elements to the classpath. * - * @param newPaths Set of classpath elements + * @param newPaths Map of classpath elements and corresponding timestamp */ - public static void addToClassPath(Set newPaths, Configuration conf, File localTmpDir) + public static void addToClassPath(Map newPaths, Configuration conf, File localTmpDir) throws Exception { URLClassLoader loader = (URLClassLoader) Thread.currentThread().getContextClassLoader(); List curPath = Lists.newArrayList(loader.getURLs()); boolean newPathAdded = false; - for (String newPath : newPaths) { - URL newUrl = urlFromPathString(newPath, conf, localTmpDir); + for (Map.Entry entry : newPaths.entrySet()) { + URL newUrl = urlFromPathString(entry.getKey(), entry.getValue(), conf, localTmpDir); if (newUrl != null && !curPath.contains(newUrl)) { curPath.add(newUrl); LOG.info("Added jar[" + newUrl + "] to classpath."); @@ -69,7 +71,8 @@ public static void addToClassPath(Set newPaths, Configuration conf, File * @param path path string * @return */ - private static URL urlFromPathString(String path, Configuration conf, File localTmpDir) { + private static URL urlFromPathString(String path, Long timeStamp, + Configuration conf, File localTmpDir) { URL url = null; try { if (StringUtils.indexOf(path, "file:/") == 0) { @@ -78,12 +81,17 @@ private static URL urlFromPathString(String path, Configuration conf, File local Path remoteFile = new Path(path); Path localFile = new Path(localTmpDir.getAbsolutePath() + File.separator + remoteFile.getName()); - if (!new File(localFile.toString()).exists()) { + Long currentTS = downloadedFiles.get(path); + if (currentTS == null) { + currentTS = -1L; + } + if (!new File(localFile.toString()).exists() || currentTS < timeStamp) { LOG.info("Copying " + remoteFile + " to " + localFile); FileSystem remoteFS = remoteFile.getFileSystem(conf); remoteFS.copyToLocalFile(remoteFile, localFile); + downloadedFiles.put(path, timeStamp); } - return urlFromPathString(localFile.toString(), conf, localTmpDir); + return urlFromPathString(localFile.toString(), timeStamp, conf, localTmpDir); } else { url = new File(path).toURL(); }