diff --git a/pom.xml b/pom.xml
index 7887e23..3c0c2a6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -160,7 +160,7 @@
4.0.4
0.5.2
2.2.0
- 1.3.1
+ 1.4.0
2.10
2.10.4
1.1
diff --git a/ql/pom.xml b/ql/pom.xml
index b076c26..663f7be 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -469,6 +469,11 @@
${spark.version}
true
+
+ com.sun.jersey
+ jersey-servlet
+ test
+
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 024e05c..93d0254 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -383,6 +383,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
ClassLoader newLoader = addToClassPath(loader, addedJars.split(";"));
Thread.currentThread().setContextClassLoader(newLoader);
+ runtimeSerializationKryo.get().setClassLoader(newLoader);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
index ff9fb85..f1d7368 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
@@ -80,4 +80,8 @@ public static JobConf deserializeJobConf(byte[] buffer) {
return conf;
}
+ public static void setClassLoader(ClassLoader classLoader) {
+ Utilities.sparkSerializationKryo.get().setClassLoader(classLoader);
+ }
+
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 8b15099..4073d2b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -232,6 +232,7 @@ public Serializable call(JobContext jc) throws Exception {
Set addedJars = jc.getAddedJars();
if (addedJars != null && !addedJars.isEmpty()) {
SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir());
+ KryoSerializer.setClassLoader(Thread.currentThread().getContextClassLoader());
localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";"));
}
diff --git a/spark-client/pom.xml b/spark-client/pom.xml
index 0d4b8fa..1d05720 100644
--- a/spark-client/pom.xml
+++ b/spark-client/pom.xml
@@ -70,6 +70,11 @@
mockito-all
test
+
+ com.sun.jersey
+ jersey-servlet
+ test
+
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
index b079ee2..589436d 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
@@ -43,21 +43,24 @@
*/
public static void addToClassPath(Set newPaths, Configuration conf, File localTmpDir)
throws Exception {
- ClassLoader cloader = Thread.currentThread().getContextClassLoader();
- URLClassLoader loader = (URLClassLoader) cloader;
+ URLClassLoader loader = (URLClassLoader) Thread.currentThread().getContextClassLoader();
List curPath = Lists.newArrayList(loader.getURLs());
+ boolean newPathAdded = false;
for (String newPath : newPaths) {
URL newUrl = urlFromPathString(newPath, conf, localTmpDir);
if (newUrl != null && !curPath.contains(newUrl)) {
curPath.add(newUrl);
LOG.info("Added jar[" + newUrl + "] to classpath.");
+ newPathAdded = true;
}
}
- URLClassLoader newLoader =
- new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader);
- Thread.currentThread().setContextClassLoader(newLoader);
+ if (newPathAdded) {
+ URLClassLoader newLoader =
+ new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader);
+ Thread.currentThread().setContextClassLoader(newLoader);
+ }
}
/**
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
index d33ad7e..ea83125 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
@@ -168,7 +168,7 @@ public void call(SparkClient client) throws Exception {
future.get(TIMEOUT, TimeUnit.SECONDS);
MetricsCollection metrics = future.getMetrics();
assertEquals(1, metrics.getJobIds().size());
- assertTrue(metrics.getAllMetrics().executorRunTime > 0L);
+ assertTrue(metrics.getAllMetrics().executorRunTime >= 0L);
verify(listener).onSparkJobStarted(same(future),
eq(metrics.getJobIds().iterator().next()));
@@ -179,7 +179,7 @@ public void call(SparkClient client) throws Exception {
MetricsCollection metrics2 = future2.getMetrics();
assertEquals(1, metrics2.getJobIds().size());
assertFalse(Objects.equal(metrics.getJobIds(), metrics2.getJobIds()));
- assertTrue(metrics2.getAllMetrics().executorRunTime > 0L);
+ assertTrue(metrics2.getAllMetrics().executorRunTime >= 0L);
verify(listener2).onSparkJobStarted(same(future2),
eq(metrics2.getJobIds().iterator().next()));
}