diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 09d9ce3..4bfe517 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -91,6 +91,7 @@ private static final Map metaConfs = new HashMap(); private final List restrictList = new ArrayList(); private final Set hiddenSet = new HashSet(); + private final List rscList = new ArrayList<>(); private Pattern modWhiteListPattern = null; private volatile boolean isSparkConfigUpdated = false; @@ -3569,6 +3570,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.spark.client.secret.bits," + "hive.spark.client.rpc.server.address," + "hive.spark.client.rpc.server.port," + + "hive.spark.client.rpc.sasl.mechanisms," + "bonecp.,"+ "hive.druid.broker.address.default,"+ "hive.druid.coordinator.address.default,"+ @@ -3589,6 +3591,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.added.files.path,hive.added.jars.path,hive.added.archives.path", "Comma separated list of variables which are used internally and should not be configurable."), + HIVE_SPARK_RSC_CONF_LIST("hive.spark.rsc.conf.list", + SPARK_OPTIMIZE_SHUFFLE_SERDE.varname + "," + + SPARK_CLIENT_FUTURE_TIMEOUT.varname, + "Comma separated list of variables which are related to remote spark context.\n" + + "Changing these variables will result in re-creating the spark session."), + HIVE_QUERY_TIMEOUT_SECONDS("hive.query.timeout.seconds", "0s", new TimeValidator(TimeUnit.SECONDS), "Timeout for Running Query in seconds. A nonpositive value means infinite. " + @@ -3916,7 +3924,7 @@ private boolean isSparkRelatedConfig(String name) { if (sparkMaster != null && sparkMaster.startsWith("yarn")) { result = true; } - } else if (name.startsWith("hive.spark")) { // Remote Spark Context property. + } else if (rscList.stream().anyMatch(rscVar -> rscVar.equals(name))) { // Remote Spark Context property. result = true; } else if (name.equals("mapreduce.job.queuename")) { // a special property starting with mapreduce that we would also like to effect if it changes @@ -4398,6 +4406,7 @@ private void initialize(Class cls) { setupRestrictList(); hiddenSet.clear(); hiddenSet.addAll(HiveConfUtil.getHiddenSet(this)); + setupRSCList(); } /** @@ -4788,6 +4797,17 @@ private void setupRestrictList() { restrictList.add(ConfVars.HIVE_CONF_RESTRICTED_LIST.varname); restrictList.add(ConfVars.HIVE_CONF_HIDDEN_LIST.varname); restrictList.add(ConfVars.HIVE_CONF_INTERNAL_VARIABLE_LIST.varname); + restrictList.add(ConfVars.HIVE_SPARK_RSC_CONF_LIST.varname); + } + + private void setupRSCList() { + rscList.clear(); + String vars = this.getVar(ConfVars.HIVE_SPARK_RSC_CONF_LIST); + if (vars != null) { + for (String var : vars.split(",")) { + rscList.add(var.trim()); + } + } } /** diff --git a/data/scripts/sleep.py b/data/scripts/sleep.py new file mode 100644 index 0000000..342f14c --- /dev/null +++ b/data/scripts/sleep.py @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import sys +import time + +for line in sys.stdin.readlines(): + time.sleep(3) \ No newline at end of file diff --git a/ql/src/test/queries/clientnegative/spark_job_max_tasks.q b/ql/src/test/queries/clientnegative/spark_job_max_tasks.q index 7473050..a638f83 100644 --- a/ql/src/test/queries/clientnegative/spark_job_max_tasks.q +++ b/ql/src/test/queries/clientnegative/spark_job_max_tasks.q @@ -1,6 +1,10 @@ set hive.spark.job.max.tasks=2; +add file ../../data/scripts/sleep.py; + EXPLAIN -SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s; +SELECT TRANSFORM(key) USING 'python sleep.py' AS k + FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k; -SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s; +SELECT TRANSFORM(key) USING 'python sleep.py' AS k + FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k; diff --git a/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q b/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q index 5bdb014..fd43b67 100644 --- a/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q +++ b/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q @@ -1,6 +1,10 @@ set hive.spark.stage.max.tasks=1; +add file ../../data/scripts/sleep.py; + EXPLAIN -SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s; +SELECT TRANSFORM(key) USING 'python sleep.py' AS k + FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k; -SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s; +SELECT TRANSFORM(key) USING 'python sleep.py' AS k + FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k; diff --git a/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out b/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out index ba2f09e..b259b63 100644 --- a/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out +++ b/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out @@ -1,8 +1,10 @@ PREHOOK: query: EXPLAIN -SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s +SELECT TRANSFORM(key) USING 'python sleep.py' AS k + FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k PREHOOK: type: QUERY POSTHOOK: query: EXPLAIN -SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s +SELECT TRANSFORM(key) USING 'python sleep.py' AS k + FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-1 is a root stage @@ -22,39 +24,43 @@ STAGE PLANS: alias: src1 Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE Select Operator - expressions: key (type: string), value (type: string) - outputColumnNames: key, value + expressions: key (type: string) + outputColumnNames: key Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE Group By Operator - aggregations: sum(value) keys: key (type: string) mode: hash - outputColumnNames: _col0, _col1 + outputColumnNames: _col0 Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: string) sort order: + Map-reduce partition columns: _col0 (type: string) Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE - value expressions: _col1 (type: double) Reducer 2 Reduce Operator Tree: Group By Operator - aggregations: sum(VALUE._col0) keys: KEY._col0 (type: string) mode: mergepartial - outputColumnNames: _col0, _col1 + outputColumnNames: _col0 Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col1 (type: double) - sort order: + + Transform Operator + command: python sleep.py + output info: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE - value expressions: _col0 (type: string) + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string) Reducer 3 Reduce Operator Tree: Select Operator - expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: double) - outputColumnNames: _col0, _col1 + expressions: VALUE._col0 (type: string) + outputColumnNames: _col0 Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false @@ -70,7 +76,8 @@ STAGE PLANS: Processor Tree: ListSink -PREHOOK: query: SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s +PREHOOK: query: SELECT TRANSFORM(key) USING 'python sleep.py' AS k + FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k PREHOOK: type: QUERY PREHOOK: Input: default@src1 #### A masked pattern was here #### diff --git a/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out b/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out index ba2f09e..b259b63 100644 --- a/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out +++ b/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out @@ -1,8 +1,10 @@ PREHOOK: query: EXPLAIN -SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s +SELECT TRANSFORM(key) USING 'python sleep.py' AS k + FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k PREHOOK: type: QUERY POSTHOOK: query: EXPLAIN -SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s +SELECT TRANSFORM(key) USING 'python sleep.py' AS k + FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-1 is a root stage @@ -22,39 +24,43 @@ STAGE PLANS: alias: src1 Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE Select Operator - expressions: key (type: string), value (type: string) - outputColumnNames: key, value + expressions: key (type: string) + outputColumnNames: key Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE Group By Operator - aggregations: sum(value) keys: key (type: string) mode: hash - outputColumnNames: _col0, _col1 + outputColumnNames: _col0 Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: string) sort order: + Map-reduce partition columns: _col0 (type: string) Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE - value expressions: _col1 (type: double) Reducer 2 Reduce Operator Tree: Group By Operator - aggregations: sum(VALUE._col0) keys: KEY._col0 (type: string) mode: mergepartial - outputColumnNames: _col0, _col1 + outputColumnNames: _col0 Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col1 (type: double) - sort order: + + Transform Operator + command: python sleep.py + output info: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE - value expressions: _col0 (type: string) + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string) Reducer 3 Reduce Operator Tree: Select Operator - expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: double) - outputColumnNames: _col0, _col1 + expressions: VALUE._col0 (type: string) + outputColumnNames: _col0 Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false @@ -70,7 +76,8 @@ STAGE PLANS: Processor Tree: ListSink -PREHOOK: query: SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s +PREHOOK: query: SELECT TRANSFORM(key) USING 'python sleep.py' AS k + FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k PREHOOK: type: QUERY PREHOOK: Input: default@src1 #### A masked pattern was here ####