diff --git a/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java b/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java new file mode 100644 index 0000000..b87de09 --- /dev/null +++ b/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.contrib.genericudf.example; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; + +/** + * GenericUDFAbs. + * + */ +@Description(name = "add10", + value = "_FUNC_(x) - returns 10 plus the original value of x", + extended = "Example:\n" + + " > SELECT _FUNC_(0) FROM src LIMIT 1;\n" + + " 10\n" + + " > SELECT _FUNC_(-5) FROM src LIMIT 1;\n" + " 5") +public class GenericUDFAdd10 extends GenericUDF { + private transient PrimitiveCategory inputType; + private final DoubleWritable resultDouble = new DoubleWritable(); + private final LongWritable resultLong = new LongWritable(); + private final IntWritable resultInt = new IntWritable(); + private final HiveDecimalWritable resultDecimal = new HiveDecimalWritable(); + private transient PrimitiveObjectInspector argumentOI; + private transient Converter inputConverter; + + @Override + public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { + if (arguments.length != 1) { + throw new UDFArgumentLengthException( + "ADD10() requires 1 argument, got " + arguments.length); + } + + if (arguments[0].getCategory() != Category.PRIMITIVE) { + throw new UDFArgumentException( + "ADD10 only takes primitive types, got " + arguments[0].getTypeName()); + } + argumentOI = (PrimitiveObjectInspector) arguments[0]; + + inputType = argumentOI.getPrimitiveCategory(); + ObjectInspector outputOI = null; + switch (inputType) { + case SHORT: + case BYTE: + case INT: + inputConverter = ObjectInspectorConverters.getConverter(arguments[0], + PrimitiveObjectInspectorFactory.writableIntObjectInspector); + outputOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector; + break; + case LONG: + inputConverter = ObjectInspectorConverters.getConverter(arguments[0], + PrimitiveObjectInspectorFactory.writableLongObjectInspector); + outputOI = PrimitiveObjectInspectorFactory.writableLongObjectInspector; + break; + case FLOAT: + case STRING: + case DOUBLE: + inputConverter = ObjectInspectorConverters.getConverter(arguments[0], + PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); + outputOI = PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; + break; + case DECIMAL: + outputOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( + ((PrimitiveObjectInspector) arguments[0]).getTypeInfo()); + inputConverter = ObjectInspectorConverters.getConverter(arguments[0], + outputOI); + break; + default: + throw new UDFArgumentException( + "ADD10 only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType); + } + return outputOI; + } + + @Override + public Object evaluate(DeferredObject[] arguments) throws HiveException { + Object valObject = arguments[0].get(); + if (valObject == null) { + return null; + } + switch (inputType) { + case SHORT: + case BYTE: + case INT: + valObject = inputConverter.convert(valObject); + resultInt.set(10 + ((IntWritable) valObject).get()); + return resultInt; + case LONG: + valObject = inputConverter.convert(valObject); + resultLong.set(10 + ((LongWritable) valObject).get()); + return resultLong; + case FLOAT: + case STRING: + case DOUBLE: + valObject = inputConverter.convert(valObject); + resultDouble.set(10.0 + ((DoubleWritable) valObject).get()); + return resultDouble; + case DECIMAL: + HiveDecimalObjectInspector decimalOI = + (HiveDecimalObjectInspector) argumentOI; + HiveDecimalWritable val = decimalOI.getPrimitiveWritableObject(valObject); + + if (val != null) { + resultDecimal.set(val.getHiveDecimal().add(HiveDecimal.create("10"))); + val = resultDecimal; + } + return val; + default: + throw new UDFArgumentException( + "ADD10 only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType); + } + } + + @Override + public String getDisplayString(String[] children) { + return getStandardDisplayString("add10", children); + } + +} diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml index ada3f3b..009395d 100644 --- a/data/conf/spark/yarn-client/hive-site.xml +++ b/data/conf/spark/yarn-client/hive-site.xml @@ -202,7 +202,7 @@ spark.master - yarn-client + yarn-cluster @@ -261,4 +261,9 @@ Internal marker for test. Used for masking env-dependent values + + hive.spark.client.connect.timeout + 30000ms + + diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 6286c9d..db8d568 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1226,6 +1226,7 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\ empty_dir_in_table.q,\ external_table_with_space_in_location_path.q,\ file_with_header_footer.q,\ + gen_udf_example_add10.q,\ import_exported_table.q,\ index_bitmap3.q,\ index_bitmap_auto.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 259c12f..d215873 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -133,6 +133,11 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex LOG.info(String.format( "load yarn property from hive configuration in %s mode (%s -> %s).", sparkMaster, propertyName, value)); + } else if (propertyName.equals(HiveConf.ConfVars.HADOOPFS.varname)) { + String value = hiveConf.get(propertyName); + if (value != null && !value.isEmpty()) { + sparkConf.put("spark.hadoop." + propertyName, value); + } } if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) { String value = RpcConfiguration.getValue(hiveConf, propertyName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index c4cb2ba..1f0938e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -297,9 +297,10 @@ public Serializable call(JobContext jc) throws Exception { // may need to load classes from this jar in other threads. Map addedJars = jc.getAddedJars(); if (addedJars != null && !addedJars.isEmpty()) { - SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir()); + List localAddedJars = SparkClientUtilities.addToClassPath(addedJars, + localJobConf, jc.getLocalTmpDir()); KryoSerializer.setClassLoader(Thread.currentThread().getContextClassLoader()); - localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars.keySet(), ";")); + localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(localAddedJars, ";")); } Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index 0268469..b2da5c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -89,7 +89,8 @@ public static URI getURI(String path) throws URISyntaxException { */ public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException { Path localFile = new Path(source.getPath()); - Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), getFileName(source)); + Path remoteFile = new Path(SessionState.get().getSparkSession().getHDFSSessionDir(), + getFileName(source)); FileSystem fileSystem = FileSystem.get(conf); // Overwrite if the remote file already exists. Whether the file can be added // on executor is up to spark, i.e. spark.files.overwrite diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java index 3d4b39b..868514d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.spark.session; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; @@ -67,4 +68,9 @@ * Close session and release resources. */ void close(); + + /** + * Get an HDFS dir specific to the SparkSession + * */ + Path getHDFSSessionDir(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index f04e145..5c8d744 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -20,6 +20,10 @@ import java.io.IOException; import java.util.UUID; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ObjectPair; @@ -37,11 +41,13 @@ public class SparkSessionImpl implements SparkSession { private static final Logger LOG = LoggerFactory.getLogger(SparkSession.class); + private static final String SPARK_DIR = "_spark_session_dir"; private HiveConf conf; private boolean isOpen; private final String sessionId; private HiveSparkClient hiveSparkClient; + private Path scratchDir; public SparkSessionImpl() { sessionId = makeSessionId(); @@ -53,6 +59,7 @@ public void open(HiveConf conf) throws HiveException { isOpen = true; try { hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf); + scratchDir = createScratchDir(); } catch (Throwable e) { throw new HiveException("Failed to create spark client.", e); } @@ -118,6 +125,7 @@ public void close() { if (hiveSparkClient != null) { try { hiveSparkClient.close(); + cleanScratchDir(); } catch (IOException e) { LOG.error("Failed to close spark session (" + sessionId + ").", e); } @@ -125,6 +133,30 @@ public void close() { hiveSparkClient = null; } + private Path createScratchDir() throws IOException { + Path parent = new Path(SessionState.get().getHdfsScratchDirURIString(), SPARK_DIR); + Path sparkDir = new Path(parent, sessionId); + FileSystem fs = sparkDir.getFileSystem(conf); + FsPermission fsPermission = new FsPermission(HiveConf.getVar( + conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION)); + fs.mkdirs(sparkDir, fsPermission); + fs.deleteOnExit(sparkDir); + return sparkDir; + } + + private void cleanScratchDir() throws IOException { + if (scratchDir != null) { + FileSystem fs = scratchDir.getFileSystem(conf); + fs.delete(scratchDir, true); + scratchDir = null; + } + } + + @Override + public Path getHDFSSessionDir() { + return scratchDir; + } + public static String makeSessionId() { return UUID.randomUUID().toString(); } diff --git a/ql/src/test/queries/clientpositive/gen_udf_example_add10.q b/ql/src/test/queries/clientpositive/gen_udf_example_add10.q new file mode 100644 index 0000000..69178c9 --- /dev/null +++ b/ql/src/test/queries/clientpositive/gen_udf_example_add10.q @@ -0,0 +1,13 @@ +add jar ${system:maven.local.repository}/org/apache/hive/hive-contrib/${system:hive.version}/hive-contrib-${system:hive.version}.jar; + +create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10'; + +create table t1(x int,y double); +load data local inpath '../../data/files/T1.txt' into table t1; + +explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10; + +select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10; + +drop table t1; +drop temporary function example_add10; diff --git a/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out new file mode 100644 index 0000000..984554d --- /dev/null +++ b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out @@ -0,0 +1,95 @@ +PREHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10' +PREHOOK: type: CREATEFUNCTION +PREHOOK: Output: example_add10 +POSTHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10' +POSTHOOK: type: CREATEFUNCTION +POSTHOOK: Output: example_add10 +PREHOOK: query: create table t1(x int,y double) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t1 +POSTHOOK: query: create table t1(x int,y double) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1 +PREHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1 +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@t1 +POSTHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1 +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@t1 +PREHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +PREHOOK: type: QUERY +POSTHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: add10(x) (type: int), add10(y) (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: double) + sort order: -+ + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + ListSink + +PREHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +18 28.0 +18 38.0 +17 27.0 +13 23.0 +12 22.0 +11 21.0 +PREHOOK: query: drop table t1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@t1 +PREHOOK: Output: default@t1 +POSTHOOK: query: drop table t1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@t1 +POSTHOOK: Output: default@t1 +PREHOOK: query: drop temporary function example_add10 +PREHOOK: type: DROPFUNCTION +PREHOOK: Output: example_add10 +POSTHOOK: query: drop temporary function example_add10 +POSTHOOK: type: DROPFUNCTION +POSTHOOK: Output: example_add10 diff --git a/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out new file mode 100644 index 0000000..05ec1f5 --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out @@ -0,0 +1,101 @@ +PREHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10' +PREHOOK: type: CREATEFUNCTION +PREHOOK: Output: example_add10 +POSTHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10' +POSTHOOK: type: CREATEFUNCTION +POSTHOOK: Output: example_add10 +PREHOOK: query: create table t1(x int,y double) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t1 +POSTHOOK: query: create table t1(x int,y double) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1 +PREHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1 +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@t1 +POSTHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1 +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@t1 +PREHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +PREHOOK: type: QUERY +POSTHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (SORT, 1) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: add10(x) (type: int), add10(y) (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: double) + sort order: -+ + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + ListSink + +PREHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +18 28.0 +18 38.0 +17 27.0 +13 23.0 +12 22.0 +11 21.0 +PREHOOK: query: drop table t1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@t1 +PREHOOK: Output: default@t1 +POSTHOOK: query: drop table t1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@t1 +POSTHOOK: Output: default@t1 +PREHOOK: query: drop temporary function example_add10 +PREHOOK: type: DROPFUNCTION +PREHOOK: Output: example_add10 +POSTHOOK: query: drop temporary function example_add10 +POSTHOOK: type: DROPFUNCTION +POSTHOOK: Output: example_add10 diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java index b779f3f..6251861 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java @@ -23,6 +23,7 @@ import java.io.File; import java.net.URL; import java.net.URLClassLoader; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -43,15 +44,18 @@ * Add new elements to the classpath. * * @param newPaths Map of classpath elements and corresponding timestamp + * @return locally accessible files corresponding to the newPaths */ - public static void addToClassPath(Map newPaths, Configuration conf, File localTmpDir) - throws Exception { + public static List addToClassPath(Map newPaths, Configuration conf, + File localTmpDir) throws Exception { URLClassLoader loader = (URLClassLoader) Thread.currentThread().getContextClassLoader(); List curPath = Lists.newArrayList(loader.getURLs()); + List localNewPaths = new ArrayList<>(); boolean newPathAdded = false; for (Map.Entry entry : newPaths.entrySet()) { URL newUrl = urlFromPathString(entry.getKey(), entry.getValue(), conf, localTmpDir); + localNewPaths.add(newUrl.toString()); if (newUrl != null && !curPath.contains(newUrl)) { curPath.add(newUrl); LOG.info("Added jar[" + newUrl + "] to classpath."); @@ -64,6 +68,7 @@ public static void addToClassPath(Map newPaths, Configuration conf new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader); Thread.currentThread().setContextClassLoader(newLoader); } + return localNewPaths; } /**