diff --git a/pom.xml b/pom.xml index 7887e23..3c0c2a6 100644 --- a/pom.xml +++ b/pom.xml @@ -160,7 +160,7 @@ 4.0.4 0.5.2 2.2.0 - 1.3.1 + 1.4.0 2.10 2.10.4 1.1 diff --git a/ql/pom.xml b/ql/pom.xml index b076c26..663f7be 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -469,6 +469,11 @@ ${spark.version} true + + com.sun.jersey + jersey-servlet + test + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 024e05c..93d0254 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -383,6 +383,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { ClassLoader loader = Thread.currentThread().getContextClassLoader(); ClassLoader newLoader = addToClassPath(loader, addedJars.split(";")); Thread.currentThread().setContextClassLoader(newLoader); + runtimeSerializationKryo.get().setClassLoader(newLoader); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java index ff9fb85..f1d7368 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java @@ -80,4 +80,8 @@ public static JobConf deserializeJobConf(byte[] buffer) { return conf; } + public static void setClassLoader(ClassLoader classLoader) { + Utilities.sparkSerializationKryo.get().setClassLoader(classLoader); + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 8b15099..4073d2b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -232,6 +232,7 @@ public Serializable call(JobContext jc) throws Exception { Set addedJars = jc.getAddedJars(); if (addedJars != null && !addedJars.isEmpty()) { SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir()); + KryoSerializer.setClassLoader(Thread.currentThread().getContextClassLoader()); localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";")); } diff --git a/spark-client/pom.xml b/spark-client/pom.xml index 0d4b8fa..1d05720 100644 --- a/spark-client/pom.xml +++ b/spark-client/pom.xml @@ -70,6 +70,11 @@ mockito-all test + + com.sun.jersey + jersey-servlet + test + diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java index b079ee2..589436d 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java @@ -43,21 +43,24 @@ */ public static void addToClassPath(Set newPaths, Configuration conf, File localTmpDir) throws Exception { - ClassLoader cloader = Thread.currentThread().getContextClassLoader(); - URLClassLoader loader = (URLClassLoader) cloader; + URLClassLoader loader = (URLClassLoader) Thread.currentThread().getContextClassLoader(); List curPath = Lists.newArrayList(loader.getURLs()); + boolean newPathAdded = false; for (String newPath : newPaths) { URL newUrl = urlFromPathString(newPath, conf, localTmpDir); if (newUrl != null && !curPath.contains(newUrl)) { curPath.add(newUrl); LOG.info("Added jar[" + newUrl + "] to classpath."); + newPathAdded = true; } } - URLClassLoader newLoader = - new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader); - Thread.currentThread().setContextClassLoader(newLoader); + if (newPathAdded) { + URLClassLoader newLoader = + new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader); + Thread.currentThread().setContextClassLoader(newLoader); + } } /** diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java index d33ad7e..ea83125 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java @@ -168,7 +168,7 @@ public void call(SparkClient client) throws Exception { future.get(TIMEOUT, TimeUnit.SECONDS); MetricsCollection metrics = future.getMetrics(); assertEquals(1, metrics.getJobIds().size()); - assertTrue(metrics.getAllMetrics().executorRunTime > 0L); + assertTrue(metrics.getAllMetrics().executorRunTime >= 0L); verify(listener).onSparkJobStarted(same(future), eq(metrics.getJobIds().iterator().next())); @@ -179,7 +179,7 @@ public void call(SparkClient client) throws Exception { MetricsCollection metrics2 = future2.getMetrics(); assertEquals(1, metrics2.getJobIds().size()); assertFalse(Objects.equal(metrics.getJobIds(), metrics2.getJobIds())); - assertTrue(metrics2.getAllMetrics().executorRunTime > 0L); + assertTrue(metrics2.getAllMetrics().executorRunTime >= 0L); verify(listener2).onSparkJobStarted(same(future2), eq(metrics2.getJobIds().iterator().next())); }