diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 6340d1c..f583aaf 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -996,4 +996,5 @@ spark.query.files=add_part_multiple.q, \ vectorized_shufflejoin.q, \ vectorized_string_funcs.q, \ vectorized_timestamp_funcs.q, \ - windowing.q + windowing.q, \ + lateral_view_explode2.q diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 9d9f4e6..404fbef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -207,6 +207,7 @@ public static final String INPUT_NAME = "iocontext.input.name"; public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class"; public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; + public static final String HIVE_ADDED_JARS = "hive.added.jars"; /** * ReduceField: @@ -364,6 +365,18 @@ private static BaseWork getBaseWork(Configuration conf, String name) { Path path = null; InputStream in = null; try { + String engine = HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE); + if (engine.equals("spark")) { + // TODO Add jar into current thread context classloader as it may be invoked by Spark driver inside + // threads, should be unnecessary while SPARK-5377 is resolved. + String addedJars = conf.get(HIVE_ADDED_JARS); + if (addedJars != null && !addedJars.isEmpty()) { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + ClassLoader newLoader = addToClassPath(loader, addedJars.split(";")); + Thread.currentThread().setContextClassLoader(newLoader); + } + } + path = getPlanPath(conf, name); LOG.info("PLAN PATH = " + path); assert path != null; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index a4a166a..41a2ab7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hive.ql.exec.spark; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; + import java.io.IOException; import java.io.Serializable; import java.net.MalformedURLException; @@ -26,6 +29,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; @@ -48,15 +52,13 @@ import org.apache.hive.spark.client.JobHandle; import org.apache.hive.spark.client.SparkClient; import org.apache.hive.spark.client.SparkClientFactory; +import org.apache.hive.spark.client.SparkClientUtilities; import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaPairRDD; -import com.google.common.base.Splitter; -import com.google.common.base.Strings; - /** * RemoteSparkClient is a wrapper of {@link org.apache.hive.spark.client.SparkClient}, which * wrap a spark job request and send to an remote SparkContext. @@ -208,6 +210,15 @@ private JobStatusJob() { @Override public Serializable call(JobContext jc) throws Exception { JobConf localJobConf = KryoSerializer.deserializeJobConf(jobConfBytes); + + // Add jar to current thread class loader dynamically, and add jar paths to JobConf as Spark + // may need to load classes from this jar in other threads. + List addedJars = jc.getAddedJars(); + if (addedJars != null && !addedJars.isEmpty()) { + SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()])); + localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";")); + } + Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class); SparkWork localSparkWork = KryoSerializer.deserialize(sparkWorkBytes, SparkWork.class); @@ -234,7 +245,6 @@ public Serializable call(JobContext jc) throws Exception { jc.monitor(future, sparkCounters, plan.getCachedRDDIds()); return null; } - } } diff --git ql/src/test/queries/clientpositive/lateral_view_explode2.q ql/src/test/queries/clientpositive/lateral_view_explode2.q new file mode 100644 index 0000000..3c48027 --- /dev/null +++ ql/src/test/queries/clientpositive/lateral_view_explode2.q @@ -0,0 +1,9 @@ +add jar ${system:maven.local.repository}/org/apache/hive/hive-contrib/${system:hive.version}/hive-contrib-${system:hive.version}.jar; + +CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2'; + +EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3; + +SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3; + +DROP TEMPORARY FUNCTION explode2; \ No newline at end of file diff --git ql/src/test/results/clientpositive/lateral_view_explode2.q.out ql/src/test/results/clientpositive/lateral_view_explode2.q.out new file mode 100644 index 0000000..e2d24ee --- /dev/null +++ ql/src/test/results/clientpositive/lateral_view_explode2.q.out @@ -0,0 +1,102 @@ +PREHOOK: query: CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2' +PREHOOK: type: CREATEFUNCTION +PREHOOK: Output: explode2 +POSTHOOK: query: CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2' +POSTHOOK: type: CREATEFUNCTION +POSTHOOK: Output: explode2 +PREHOOK: query: EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE + Lateral View Forward + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 500 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE + Lateral View Join Operator + outputColumnNames: _col5, _col6 + Statistics: Num rows: 1000 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + keys: _col5 (type: int), _col6 (type: int) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: int) + sort order: ++ + Map-reduce partition columns: _col0 (type: int), _col1 (type: int) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE + Select Operator + expressions: array(1,2,3) (type: array) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE + UDTF Operator + Statistics: Num rows: 500 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE + function name: explode + Lateral View Join Operator + outputColumnNames: _col5, _col6 + Statistics: Num rows: 1000 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + keys: _col5 (type: int), _col6 (type: int) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: int) + sort order: ++ + Map-reduce partition columns: _col0 (type: int), _col1 (type: int) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: int), KEY._col1 (type: int) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: int), _col1 (type: int) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE + Limit + Number of rows: 3 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 3 + Processor Tree: + ListSink + +PREHOOK: query: SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1 1 +2 2 +3 3 +PREHOOK: query: DROP TEMPORARY FUNCTION explode2 +PREHOOK: type: DROPFUNCTION +PREHOOK: Output: explode2 +POSTHOOK: query: DROP TEMPORARY FUNCTION explode2 +POSTHOOK: type: DROPFUNCTION +POSTHOOK: Output: explode2 diff --git ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out new file mode 100644 index 0000000..07f7349 --- /dev/null +++ ql/src/test/results/clientpositive/spark/lateral_view_explode2.q.out @@ -0,0 +1,108 @@ +PREHOOK: query: CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2' +PREHOOK: type: CREATEFUNCTION +PREHOOK: Output: explode2 +POSTHOOK: query: CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2' +POSTHOOK: type: CREATEFUNCTION +POSTHOOK: Output: explode2 +PREHOOK: query: EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (GROUP, 2) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE + Lateral View Forward + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 500 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE + Lateral View Join Operator + outputColumnNames: _col5, _col6 + Statistics: Num rows: 1000 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + keys: _col5 (type: int), _col6 (type: int) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: int) + sort order: ++ + Map-reduce partition columns: _col0 (type: int), _col1 (type: int) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE + Select Operator + expressions: array(1,2,3) (type: array) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE + UDTF Operator + Statistics: Num rows: 500 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE + function name: explode + Lateral View Join Operator + outputColumnNames: _col5, _col6 + Statistics: Num rows: 1000 Data size: 28000 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + keys: _col5 (type: int), _col6 (type: int) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: int) + sort order: ++ + Map-reduce partition columns: _col0 (type: int), _col1 (type: int) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE + Reducer 2 + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: int), KEY._col1 (type: int) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: int), _col1 (type: int) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE + Limit + Number of rows: 3 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 3 + Processor Tree: + ListSink + +PREHOOK: query: SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +2 2 +1 1 +3 3 +PREHOOK: query: DROP TEMPORARY FUNCTION explode2 +PREHOOK: type: DROPFUNCTION +PREHOOK: Output: explode2 +POSTHOOK: query: DROP TEMPORARY FUNCTION explode2 +POSTHOOK: type: DROPFUNCTION +POSTHOOK: Output: explode2 diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java index 00aa4ec..22b7e89 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java @@ -53,4 +53,9 @@ */ Map>> getMonitoredJobs(); + /** + * Return all added jar path which added through AddJarJob. + */ + List getAddedJars(); + } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java index 1eb3ff2..8d353ce 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.hive.spark.counter.SparkCounters; @@ -32,11 +33,13 @@ private final JavaSparkContext sc; private final ThreadLocal monitorCb; private final Map>> monitoredJobs; + private final List addedJars; public JobContextImpl(JavaSparkContext sc) { this.sc = sc; this.monitorCb = new ThreadLocal(); monitoredJobs = new ConcurrentHashMap>>(); + addedJars = new CopyOnWriteArrayList(); } @@ -57,6 +60,11 @@ public JavaSparkContext sc() { return monitoredJobs; } + @Override + public List getAddedJars() { + return addedJars; + } + void setMonitorCb(MonitorCallback cb) { monitorCb.set(cb); } diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 5f9be65..aea90db 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -529,6 +529,9 @@ public void run() { @Override public Serializable call(JobContext jc) throws Exception { jc.sc().addJar(path); + // Following remote job may refer to classes in this jar, and the remote job would be executed + // in a different thread, so we add this jar path to JobContext for further usage. + jc.getAddedJars().add(path); return null; } diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java new file mode 100644 index 0000000..93fceaf --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client; + +import com.google.common.collect.Lists; + +import java.io.File; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class SparkClientUtilities { + protected static final transient Log LOG = LogFactory.getLog(SparkClientUtilities.class); + + /** + * Add new elements to the classpath. + * + * @param newPaths Array of classpath elements + */ + public static void addToClassPath(String[] newPaths) throws Exception { + ClassLoader cloader = Thread.currentThread().getContextClassLoader(); + URLClassLoader loader = (URLClassLoader) cloader; + List curPath = Lists.newArrayList(loader.getURLs()); + + for (String newPath : newPaths) { + URL newUrl = urlFromPathString(newPath); + if (newUrl != null && !curPath.contains(newUrl)) { + curPath.add(newUrl); + LOG.info("Added jar[" + newUrl + "] to classpath."); + } + } + + URLClassLoader newLoader = new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader); + Thread.currentThread().setContextClassLoader(newLoader); + } + + /** + * Create a URL from a string representing a path to a local file. + * The path string can be just a path, or can start with file:/, file:/// + * @param path path string + * @return + */ + private static URL urlFromPathString(String path) { + URL url = null; + try { + if (StringUtils.indexOf(path, "file:/") == 0) { + url = new URL(path); + } else { + url = new File(path).toURL(); + } + } catch (Exception err) { + LOG.error("Bad URL " + path + ", ignoring path"); + } + return url; + } +}