diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index d7cb111..0d0ae30 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -206,6 +206,7 @@ public static final String INPUT_NAME = "iocontext.input.name"; public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class"; public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; + public static final String HIVE_ADDED_JARS = "hive.added.jars"; /** * ReduceField: @@ -363,6 +364,13 @@ private static BaseWork getBaseWork(Configuration conf, String name) { Path path = null; InputStream in = null; try { + String addedJars = conf.get(HIVE_ADDED_JARS); + if (addedJars != null) { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + ClassLoader newLoader = addToClassPath(loader, addedJars.split(";")); + Thread.currentThread().setContextClassLoader(newLoader); + } + path = getPlanPath(conf, name); LOG.info("PLAN PATH = " + path); assert path != null; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 30a00a7..106506d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hive.ql.exec.spark; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; + import java.io.IOException; import java.io.Serializable; import java.net.MalformedURLException; @@ -26,6 +29,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; @@ -48,15 +52,13 @@ import org.apache.hive.spark.client.JobHandle; import org.apache.hive.spark.client.SparkClient; import org.apache.hive.spark.client.SparkClientFactory; +import org.apache.hive.spark.client.SparkClientUtilities; import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaPairRDD; -import com.google.common.base.Splitter; -import com.google.common.base.Strings; - /** * RemoteSparkClient is a wrapper of {@link org.apache.hive.spark.client.SparkClient}, which * wrap a spark job request and send to an remote SparkContext. @@ -209,6 +211,15 @@ private JobStatusJob() { @Override public Serializable call(JobContext jc) throws Exception { JobConf localJobConf = KryoSerializer.deserializeJobConf(jobConfBytes); + + // Add jar to current thread classloader dynamically, and add jar paths to JobConf as Spark + // may deserialize MapWork in other threads. + List addedJars = jc.getAddedJars(); + if (addedJars != null && !addedJars.isEmpty()) { + SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()])); + localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";")); + } + Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class); SparkWork localSparkWork = KryoSerializer.deserialize(sparkWorkBytes, SparkWork.class); @@ -235,7 +246,6 @@ public Serializable call(JobContext jc) throws Exception { jc.monitor(future, sparkCounters, plan.getCachedRDDIds()); return null; } - } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java index 00aa4ec..22b7e89 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java @@ -53,4 +53,9 @@ */ Map>> getMonitoredJobs(); + /** + * Return all added jar path which added through AddJarJob. + */ + List getAddedJars(); + } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java index 1eb3ff2..8d353ce 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.hive.spark.counter.SparkCounters; @@ -32,11 +33,13 @@ private final JavaSparkContext sc; private final ThreadLocal monitorCb; private final Map>> monitoredJobs; + private final List addedJars; public JobContextImpl(JavaSparkContext sc) { this.sc = sc; this.monitorCb = new ThreadLocal(); monitoredJobs = new ConcurrentHashMap>>(); + addedJars = new CopyOnWriteArrayList(); } @@ -57,6 +60,11 @@ public JavaSparkContext sc() { return monitoredJobs; } + @Override + public List getAddedJars() { + return addedJars; + } + void setMonitorCb(MonitorCallback cb) { monitorCb.set(cb); } diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 5f9be65..dbd1627 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -529,6 +529,7 @@ public void run() { @Override public Serializable call(JobContext jc) throws Exception { jc.sc().addJar(path); + jc.getAddedJars().add(path); return null; } diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java new file mode 100644 index 0000000..f38e8d9 --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark.client; + +import java.io.File; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class SparkClientUtilities { + protected static final transient Log LOG = LogFactory.getLog(SparkClientUtilities.class); + + /** + * Add new elements to the classpath. + * + * @param newPaths Array of classpath elements + */ + public static void addToClassPath(String[] newPaths) throws Exception { + ClassLoader cloader = Thread.currentThread().getContextClassLoader(); + URLClassLoader loader = (URLClassLoader) cloader; + List curPath = Lists.newArrayList(loader.getURLs()); + + boolean updated = false; + for (String newPath : newPaths) { + URL newUrl = urlFromPathString(newPath); + if (newUrl != null && !curPath.contains(newUrl)) { + curPath.add(newUrl); + updated = true; + LOG.info("Added jar[" + newUrl + "] to classpath."); + } + } + + if (updated) { + URLClassLoader newLoader = new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader); + Thread.currentThread().setContextClassLoader(newLoader); + } + } + + /** + * Create a URL from a string representing a path to a local file. + * The path string can be just a path, or can start with file:/, file:/// + * @param path path string + * @return + */ + private static URL urlFromPathString(String path) { + URL url = null; + try { + if (StringUtils.indexOf(path, "file:/") == 0) { + url = new URL(path); + } else { + url = new File(path).toURL(); + } + } catch (Exception err) { + LOG.error("Bad URL " + path + ", ignoring path"); + } + return url; + } +}