diff --git a/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java b/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java
new file mode 100644
index 0000000..b87de09
--- /dev/null
+++ b/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.contrib.genericudf.example;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * GenericUDFAbs.
+ *
+ */
+@Description(name = "add10",
+ value = "_FUNC_(x) - returns 10 plus the original value of x",
+ extended = "Example:\n"
+ + " > SELECT _FUNC_(0) FROM src LIMIT 1;\n"
+ + " 10\n"
+ + " > SELECT _FUNC_(-5) FROM src LIMIT 1;\n" + " 5")
+public class GenericUDFAdd10 extends GenericUDF {
+ private transient PrimitiveCategory inputType;
+ private final DoubleWritable resultDouble = new DoubleWritable();
+ private final LongWritable resultLong = new LongWritable();
+ private final IntWritable resultInt = new IntWritable();
+ private final HiveDecimalWritable resultDecimal = new HiveDecimalWritable();
+ private transient PrimitiveObjectInspector argumentOI;
+ private transient Converter inputConverter;
+
+ @Override
+ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
+ if (arguments.length != 1) {
+ throw new UDFArgumentLengthException(
+ "ADD10() requires 1 argument, got " + arguments.length);
+ }
+
+ if (arguments[0].getCategory() != Category.PRIMITIVE) {
+ throw new UDFArgumentException(
+ "ADD10 only takes primitive types, got " + arguments[0].getTypeName());
+ }
+ argumentOI = (PrimitiveObjectInspector) arguments[0];
+
+ inputType = argumentOI.getPrimitiveCategory();
+ ObjectInspector outputOI = null;
+ switch (inputType) {
+ case SHORT:
+ case BYTE:
+ case INT:
+ inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+ PrimitiveObjectInspectorFactory.writableIntObjectInspector);
+ outputOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;
+ break;
+ case LONG:
+ inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+ PrimitiveObjectInspectorFactory.writableLongObjectInspector);
+ outputOI = PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ break;
+ case FLOAT:
+ case STRING:
+ case DOUBLE:
+ inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+ PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+ outputOI = PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+ break;
+ case DECIMAL:
+ outputOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+ ((PrimitiveObjectInspector) arguments[0]).getTypeInfo());
+ inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+ outputOI);
+ break;
+ default:
+ throw new UDFArgumentException(
+ "ADD10 only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType);
+ }
+ return outputOI;
+ }
+
+ @Override
+ public Object evaluate(DeferredObject[] arguments) throws HiveException {
+ Object valObject = arguments[0].get();
+ if (valObject == null) {
+ return null;
+ }
+ switch (inputType) {
+ case SHORT:
+ case BYTE:
+ case INT:
+ valObject = inputConverter.convert(valObject);
+ resultInt.set(10 + ((IntWritable) valObject).get());
+ return resultInt;
+ case LONG:
+ valObject = inputConverter.convert(valObject);
+ resultLong.set(10 + ((LongWritable) valObject).get());
+ return resultLong;
+ case FLOAT:
+ case STRING:
+ case DOUBLE:
+ valObject = inputConverter.convert(valObject);
+ resultDouble.set(10.0 + ((DoubleWritable) valObject).get());
+ return resultDouble;
+ case DECIMAL:
+ HiveDecimalObjectInspector decimalOI =
+ (HiveDecimalObjectInspector) argumentOI;
+ HiveDecimalWritable val = decimalOI.getPrimitiveWritableObject(valObject);
+
+ if (val != null) {
+ resultDecimal.set(val.getHiveDecimal().add(HiveDecimal.create("10")));
+ val = resultDecimal;
+ }
+ return val;
+ default:
+ throw new UDFArgumentException(
+ "ADD10 only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType);
+ }
+ }
+
+ @Override
+ public String getDisplayString(String[] children) {
+ return getStandardDisplayString("add10", children);
+ }
+
+}
diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml
index ada3f3b..009395d 100644
--- a/data/conf/spark/yarn-client/hive-site.xml
+++ b/data/conf/spark/yarn-client/hive-site.xml
@@ -202,7 +202,7 @@
spark.master
- yarn-client
+ yarn-cluster
@@ -261,4 +261,9 @@
Internal marker for test. Used for masking env-dependent values
+
+ hive.spark.client.connect.timeout
+ 30000ms
+
+
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 6286c9d..db8d568 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1226,6 +1226,7 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
empty_dir_in_table.q,\
external_table_with_space_in_location_path.q,\
file_with_header_footer.q,\
+ gen_udf_example_add10.q,\
import_exported_table.q,\
index_bitmap3.q,\
index_bitmap_auto.q,\
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 259c12f..d215873 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -133,6 +133,11 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex
LOG.info(String.format(
"load yarn property from hive configuration in %s mode (%s -> %s).",
sparkMaster, propertyName, value));
+ } else if (propertyName.equals(HiveConf.ConfVars.HADOOPFS.varname)) {
+ String value = hiveConf.get(propertyName);
+ if (value != null && !value.isEmpty()) {
+ sparkConf.put("spark.hadoop." + propertyName, value);
+ }
}
if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) {
String value = RpcConfiguration.getValue(hiveConf, propertyName);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index c4cb2ba..1f0938e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -297,9 +297,10 @@ public Serializable call(JobContext jc) throws Exception {
// may need to load classes from this jar in other threads.
Map addedJars = jc.getAddedJars();
if (addedJars != null && !addedJars.isEmpty()) {
- SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir());
+ List localAddedJars = SparkClientUtilities.addToClassPath(addedJars,
+ localJobConf, jc.getLocalTmpDir());
KryoSerializer.setClassLoader(Thread.currentThread().getContextClassLoader());
- localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars.keySet(), ";"));
+ localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(localAddedJars, ";"));
}
Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index 0268469..b2da5c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -89,7 +89,8 @@ public static URI getURI(String path) throws URISyntaxException {
*/
public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException {
Path localFile = new Path(source.getPath());
- Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), getFileName(source));
+ Path remoteFile = new Path(SessionState.get().getSparkSession().getHDFSSessionDir(),
+ getFileName(source));
FileSystem fileSystem = FileSystem.get(conf);
// Overwrite if the remote file already exists. Whether the file can be added
// on executor is up to spark, i.e. spark.files.overwrite
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
index 3d4b39b..e1f8c1d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.exec.spark.session;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.DriverContext;
@@ -24,6 +25,8 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.SparkWork;
+import java.io.IOException;
+
public interface SparkSession {
/**
* Initializes a Spark session for DAG execution.
@@ -67,4 +70,9 @@
* Close session and release resources.
*/
void close();
+
+ /**
+ * Get an HDFS dir specific to the SparkSession
+ * */
+ Path getHDFSSessionDir() throws IOException;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index f04e145..51c6715 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -20,6 +20,10 @@
import java.io.IOException;
import java.util.UUID;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.ObjectPair;
@@ -37,11 +41,14 @@
public class SparkSessionImpl implements SparkSession {
private static final Logger LOG = LoggerFactory.getLogger(SparkSession.class);
+ private static final String SPARK_DIR = "_spark_session_dir";
private HiveConf conf;
private boolean isOpen;
private final String sessionId;
private HiveSparkClient hiveSparkClient;
+ private Path scratchDir;
+ private final Object dirLock = new Object();
public SparkSessionImpl() {
sessionId = makeSessionId();
@@ -118,6 +125,7 @@ public void close() {
if (hiveSparkClient != null) {
try {
hiveSparkClient.close();
+ cleanScratchDir();
} catch (IOException e) {
LOG.error("Failed to close spark session (" + sessionId + ").", e);
}
@@ -125,6 +133,37 @@ public void close() {
hiveSparkClient = null;
}
+ private Path createScratchDir() throws IOException {
+ Path parent = new Path(SessionState.get().getHdfsScratchDirURIString(), SPARK_DIR);
+ Path sparkDir = new Path(parent, sessionId);
+ FileSystem fs = sparkDir.getFileSystem(conf);
+ FsPermission fsPermission = new FsPermission(HiveConf.getVar(
+ conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION));
+ fs.mkdirs(sparkDir, fsPermission);
+ fs.deleteOnExit(sparkDir);
+ return sparkDir;
+ }
+
+ private void cleanScratchDir() throws IOException {
+ if (scratchDir != null) {
+ FileSystem fs = scratchDir.getFileSystem(conf);
+ fs.delete(scratchDir, true);
+ scratchDir = null;
+ }
+ }
+
+ @Override
+ public Path getHDFSSessionDir() throws IOException {
+ if (scratchDir == null) {
+ synchronized (dirLock) {
+ if (scratchDir == null) {
+ scratchDir = createScratchDir();
+ }
+ }
+ }
+ return scratchDir;
+ }
+
public static String makeSessionId() {
return UUID.randomUUID().toString();
}
diff --git a/ql/src/test/queries/clientpositive/gen_udf_example_add10.q b/ql/src/test/queries/clientpositive/gen_udf_example_add10.q
new file mode 100644
index 0000000..69178c9
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/gen_udf_example_add10.q
@@ -0,0 +1,13 @@
+add jar ${system:maven.local.repository}/org/apache/hive/hive-contrib/${system:hive.version}/hive-contrib-${system:hive.version}.jar;
+
+create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10';
+
+create table t1(x int,y double);
+load data local inpath '../../data/files/T1.txt' into table t1;
+
+explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10;
+
+select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10;
+
+drop table t1;
+drop temporary function example_add10;
diff --git a/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
new file mode 100644
index 0000000..984554d
--- /dev/null
+++ b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
@@ -0,0 +1,95 @@
+PREHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10'
+PREHOOK: type: CREATEFUNCTION
+PREHOOK: Output: example_add10
+POSTHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10'
+POSTHOOK: type: CREATEFUNCTION
+POSTHOOK: Output: example_add10
+PREHOOK: query: create table t1(x int,y double)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@t1
+POSTHOOK: query: create table t1(x int,y double)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@t1
+PREHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@t1
+POSTHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@t1
+PREHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: t1
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: add10(x) (type: int), add10(y) (type: double)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: double)
+ sort order: -+
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: double)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Limit
+ Number of rows: 10
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 10
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1
+#### A masked pattern was here ####
+POSTHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1
+#### A masked pattern was here ####
+18 28.0
+18 38.0
+17 27.0
+13 23.0
+12 22.0
+11 21.0
+PREHOOK: query: drop table t1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@t1
+PREHOOK: Output: default@t1
+POSTHOOK: query: drop table t1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@t1
+POSTHOOK: Output: default@t1
+PREHOOK: query: drop temporary function example_add10
+PREHOOK: type: DROPFUNCTION
+PREHOOK: Output: example_add10
+POSTHOOK: query: drop temporary function example_add10
+POSTHOOK: type: DROPFUNCTION
+POSTHOOK: Output: example_add10
diff --git a/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
new file mode 100644
index 0000000..05ec1f5
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
@@ -0,0 +1,101 @@
+PREHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10'
+PREHOOK: type: CREATEFUNCTION
+PREHOOK: Output: example_add10
+POSTHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10'
+POSTHOOK: type: CREATEFUNCTION
+POSTHOOK: Output: example_add10
+PREHOOK: query: create table t1(x int,y double)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@t1
+POSTHOOK: query: create table t1(x int,y double)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@t1
+PREHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@t1
+POSTHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@t1
+PREHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (SORT, 1)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: t1
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: add10(x) (type: int), add10(y) (type: double)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: double)
+ sort order: -+
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Reducer 2
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: double)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Limit
+ Number of rows: 10
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 10
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1
+#### A masked pattern was here ####
+POSTHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1
+#### A masked pattern was here ####
+18 28.0
+18 38.0
+17 27.0
+13 23.0
+12 22.0
+11 21.0
+PREHOOK: query: drop table t1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@t1
+PREHOOK: Output: default@t1
+POSTHOOK: query: drop table t1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@t1
+POSTHOOK: Output: default@t1
+PREHOOK: query: drop temporary function example_add10
+PREHOOK: type: DROPFUNCTION
+PREHOOK: Output: example_add10
+POSTHOOK: query: drop temporary function example_add10
+POSTHOOK: type: DROPFUNCTION
+POSTHOOK: Output: example_add10
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
index b779f3f..6251861 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
@@ -23,6 +23,7 @@
import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -43,15 +44,18 @@
* Add new elements to the classpath.
*
* @param newPaths Map of classpath elements and corresponding timestamp
+ * @return locally accessible files corresponding to the newPaths
*/
- public static void addToClassPath(Map newPaths, Configuration conf, File localTmpDir)
- throws Exception {
+ public static List addToClassPath(Map newPaths, Configuration conf,
+ File localTmpDir) throws Exception {
URLClassLoader loader = (URLClassLoader) Thread.currentThread().getContextClassLoader();
List curPath = Lists.newArrayList(loader.getURLs());
+ List localNewPaths = new ArrayList<>();
boolean newPathAdded = false;
for (Map.Entry entry : newPaths.entrySet()) {
URL newUrl = urlFromPathString(entry.getKey(), entry.getValue(), conf, localTmpDir);
+ localNewPaths.add(newUrl.toString());
if (newUrl != null && !curPath.contains(newUrl)) {
curPath.add(newUrl);
LOG.info("Added jar[" + newUrl + "] to classpath.");
@@ -64,6 +68,7 @@ public static void addToClassPath(Map newPaths, Configuration conf
new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader);
Thread.currentThread().setContextClassLoader(newLoader);
}
+ return localNewPaths;
}
/**