diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index f1d65ea..6701a29 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -224,7 +224,8 @@ public Serializable call(JobContext jc) throws Exception { // may need to load classes from this jar in other threads. List addedJars = jc.getAddedJars(); if (addedJars != null && !addedJars.isEmpty()) { - SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()])); + SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()]), + localJobConf, jc.getLocalTmpDir()); localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";")); } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java index 22b7e89..36e252c 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java @@ -17,6 +17,7 @@ package org.apache.hive.spark.client; +import java.io.File; import java.util.List; import java.util.Map; import java.util.Set; @@ -58,4 +59,9 @@ */ List getAddedJars(); + /** + * Returns a local tmp dir specific to the context + */ + File getLocalTmpDir(); + } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java index 8d353ce..164d90a 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java @@ -17,6 +17,7 @@ package org.apache.hive.spark.client; +import java.io.File; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,12 +35,14 @@ private final ThreadLocal monitorCb; private final Map>> monitoredJobs; private final List addedJars; + private final File localTmpDir; - public JobContextImpl(JavaSparkContext sc) { + public JobContextImpl(JavaSparkContext sc, File localTmpDir) { this.sc = sc; this.monitorCb = new ThreadLocal(); monitoredJobs = new ConcurrentHashMap>>(); addedJars = new CopyOnWriteArrayList(); + this.localTmpDir = localTmpDir; } @@ -65,6 +68,11 @@ public JavaSparkContext sc() { return addedJars; } + @Override + public File getLocalTmpDir() { + return localTmpDir; + } + void setMonitorCb(MonitorCallback cb) { monitorCb.set(cb); } diff --git spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index 4e15902..b77c9e8 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -18,9 +18,12 @@ package org.apache.hive.spark.client; import com.google.common.base.Throwables; +import com.google.common.io.Files; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.nio.NioEventLoopGroup; +import java.io.File; +import java.io.IOException; import java.io.Serializable; import java.util.Iterator; import java.util.List; @@ -34,6 +37,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.rpc.Rpc; @@ -85,6 +89,8 @@ private final NioEventLoopGroup egroup; private final Rpc clientRpc; private final DriverProtocol protocol; + // a local temp dir specific to this driver + private final File localTmpDir; // Used to queue up requests while the SparkContext is being created. private final List> jobQueue = Lists.newLinkedList(); @@ -98,6 +104,7 @@ private RemoteDriver(String[] args) throws Exception { this.activeJobs = Maps.newConcurrentMap(); this.jcLock = new Object(); this.shutdownLock = new Object(); + localTmpDir = Files.createTempDir(); SparkConf conf = new SparkConf(); String serverAddress = null; @@ -162,7 +169,7 @@ public void rpcClosed(Rpc rpc) { JavaSparkContext sc = new JavaSparkContext(conf); sc.sc().addSparkListener(new ClientListener()); synchronized (jcLock) { - jc = new JobContextImpl(sc); + jc = new JobContextImpl(sc, localTmpDir); jcLock.notifyAll(); } } catch (Exception e) { @@ -188,6 +195,11 @@ private void run() throws InterruptedException { } } executor.shutdownNow(); + try { + FileUtils.deleteDirectory(localTmpDir); + } catch (IOException e) { + LOG.warn("Failed to delete local tmp dir: " + localTmpDir, e); + } } private void submit(JobWrapper job) { diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 5de7c2e..71e432d 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -250,6 +250,7 @@ public void run() { if (!properties.setReadable(false) || !properties.setReadable(true, true)) { throw new IOException("Cannot change permissions of job properties file."); } + properties.deleteOnExit(); Properties allProps = new Properties(); // first load the defaults from spark-defaults.conf if available diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java index 93fceaf..879f8a4 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java @@ -28,6 +28,9 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; public class SparkClientUtilities { protected static final transient Log LOG = LogFactory.getLog(SparkClientUtilities.class); @@ -37,20 +40,22 @@ * * @param newPaths Array of classpath elements */ - public static void addToClassPath(String[] newPaths) throws Exception { + public static void addToClassPath(String[] newPaths, Configuration conf, File localTmpDir) + throws Exception { ClassLoader cloader = Thread.currentThread().getContextClassLoader(); URLClassLoader loader = (URLClassLoader) cloader; List curPath = Lists.newArrayList(loader.getURLs()); for (String newPath : newPaths) { - URL newUrl = urlFromPathString(newPath); + URL newUrl = urlFromPathString(newPath, conf, localTmpDir); if (newUrl != null && !curPath.contains(newUrl)) { curPath.add(newUrl); LOG.info("Added jar[" + newUrl + "] to classpath."); } } - URLClassLoader newLoader = new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader); + URLClassLoader newLoader = + new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader); Thread.currentThread().setContextClassLoader(newLoader); } @@ -60,16 +65,24 @@ public static void addToClassPath(String[] newPaths) throws Exception { * @param path path string * @return */ - private static URL urlFromPathString(String path) { + private static URL urlFromPathString(String path, Configuration conf, File localTmpDir) { URL url = null; try { if (StringUtils.indexOf(path, "file:/") == 0) { url = new URL(path); + } else if (StringUtils.indexOf(path, "hdfs:/") == 0) { + Path remoteFile = new Path(path); + Path localFile = + new Path(localTmpDir.getAbsolutePath() + File.separator + remoteFile.getName()); + LOG.info("Copying " + remoteFile + " to " + localFile); + FileSystem fs = remoteFile.getFileSystem(conf); + fs.copyToLocalFile(remoteFile, localFile); + return urlFromPathString(localFile.toString(), conf, localTmpDir); } else { url = new File(path).toURL(); } } catch (Exception err) { - LOG.error("Bad URL " + path + ", ignoring path"); + LOG.error("Bad URL " + path + ", ignoring path", err); } return url; }