diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 30a00a7..75e5e5a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hive.ql.exec.spark; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; + import java.io.IOException; import java.io.Serializable; import java.net.MalformedURLException; @@ -48,15 +51,13 @@ import org.apache.hive.spark.client.JobHandle; import org.apache.hive.spark.client.SparkClient; import org.apache.hive.spark.client.SparkClientFactory; +import org.apache.hive.spark.client.SparkClientUtilities; import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaPairRDD; -import com.google.common.base.Splitter; -import com.google.common.base.Strings; - /** * RemoteSparkClient is a wrapper of {@link org.apache.hive.spark.client.SparkClient}, which * wrap a spark job request and send to an remote SparkContext. @@ -209,6 +210,12 @@ private JobStatusJob() { @Override public Serializable call(JobContext jc) throws Exception { JobConf localJobConf = KryoSerializer.deserializeJobConf(jobConfBytes); + + List addedJars = jc.getAddedJars(); + if (addedJars != null && !addedJars.isEmpty()) { + SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()])); + } + Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class); SparkWork localSparkWork = KryoSerializer.deserialize(sparkWorkBytes, SparkWork.class); @@ -235,7 +242,6 @@ public Serializable call(JobContext jc) throws Exception { jc.monitor(future, sparkCounters, plan.getCachedRDDIds()); return null; } - } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java index 00aa4ec..22b7e89 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java @@ -53,4 +53,9 @@ */ Map>> getMonitoredJobs(); + /** + * Return all added jar path which added through AddJarJob. + */ + List getAddedJars(); + } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java index 1eb3ff2..8d353ce 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.hive.spark.counter.SparkCounters; @@ -32,11 +33,13 @@ private final JavaSparkContext sc; private final ThreadLocal monitorCb; private final Map>> monitoredJobs; + private final List addedJars; public JobContextImpl(JavaSparkContext sc) { this.sc = sc; this.monitorCb = new ThreadLocal(); monitoredJobs = new ConcurrentHashMap>>(); + addedJars = new CopyOnWriteArrayList(); } @@ -57,6 +60,11 @@ public JavaSparkContext sc() { return monitoredJobs; } + @Override + public List getAddedJars() { + return addedJars; + } + void setMonitorCb(MonitorCallback cb) { monitorCb.set(cb); } diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 5f9be65..dbd1627 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -529,6 +529,7 @@ public void run() { @Override public Serializable call(JobContext jc) throws Exception { jc.sc().addJar(path); + jc.getAddedJars().add(path); return null; } diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java new file mode 100644 index 0000000..6695352 --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark.client; + +import java.io.File; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class SparkClientUtilities { + protected static final transient Log LOG = LogFactory + .getLog(SparkClientUtilities.class); + + /** + * Add new elements to the classpath. + * + * @param newPaths + * Array of classpath elements + */ + public static void addToClassPath( String[] newPaths) throws Exception { + ClassLoader cloader = Thread.currentThread().getContextClassLoader(); + URLClassLoader loader = (URLClassLoader) cloader; + List curPath = Arrays.asList(loader.getURLs()); + ArrayList newPath = new ArrayList(); + + // get a list with the current classpath components + for (URL onePath : curPath) { + newPath.add(onePath); + } + curPath = newPath; + + for (String onestr : newPaths) { + URL oneurl = urlFromPathString(onestr); + if (oneurl != null && !curPath.contains(oneurl)) { + curPath.add(oneurl); + } + } + + URLClassLoader newLoader = new URLClassLoader(curPath.toArray(new URL[0]), loader); + Thread.currentThread().setContextClassLoader(newLoader); + } + + /** + * Create a URL from a string representing a path to a local file. + * The path string can be just a path, or can start with file:/, file:/// + * @param onestr path string + * @return + */ + private static URL urlFromPathString(String onestr) { + URL oneurl = null; + try { + if (StringUtils.indexOf(onestr, "file:/") == 0) { + oneurl = new URL(onestr); + } else { + oneurl = new File(onestr).toURL(); + } + } catch (Exception err) { + LOG.error("Bad URL " + onestr + ", ignoring path"); + } + return oneurl; + } +}