diff --git a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
index 83517ce..b44f92f 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hive.common;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -47,4 +50,20 @@ public static void cleanUpScratchDir(HiveConf hiveConf) {
}
}
+ /**
+ * Get the Inet address of the machine of the given host name.
+ * @param hostname The name of the host
+ * @return The network address of the the host
+ * @throws UnknownHostException
+ */
+ public static InetAddress getHostAddress(String hostname) throws UnknownHostException {
+ InetAddress serverIPAddress;
+ if (hostname != null && !hostname.isEmpty()) {
+ serverIPAddress = InetAddress.getByName(hostname);
+ } else {
+ serverIPAddress = InetAddress.getLocalHost();
+ }
+ return serverIPAddress;
+ }
+
}
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 97fe7bc..9bf0d66 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -386,6 +386,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) {
// a symbolic name to reference in the Hive source code. Properties with non-null
// values will override any values set in the underlying Hadoop configuration.
HADOOPBIN("hadoop.bin.path", findHadoopBinary(), "", true),
+ YARNBIN("yarn.bin.path", findYarnBinary(), "", true),
HIVE_FS_HAR_IMPL("fs.har.impl", "org.apache.hadoop.hive.shims.HiveHarFileSystem",
"The implementation for accessing Hadoop Archives. Note that this won't be applicable to Hadoop versions less than 0.20"),
MAPREDMAXSPLITSIZE(FileInputFormat.SPLIT_MAXSIZE, 256000000L, "", true),
@@ -2597,6 +2598,11 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) {
"Channel logging level for remote Spark driver. One of {DEBUG, ERROR, INFO, TRACE, WARN}."),
SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5",
"Name of the SASL mechanism to use for authentication."),
+ SPARK_RPC_SERVER_ADDRESS("hive.spark.client.rpc.server.address", "",
+ "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " +
+ "Default is empty, which means the address will be determined in the same way as for hive.server2.thrift.bind.host." +
+ "This is only necessary if the host has mutiple network addresses and if a different network address other than " +
+ "hive.server2.thrift.bind.host is to be used."),
SPARK_DYNAMIC_PARTITION_PRUNING(
"hive.spark.dynamic.partition.pruning", false,
"When dynamic pruning is enabled, joins on partition keys will be processed by writing\n" +
@@ -2787,16 +2793,27 @@ public String toString() {
}
private static String findHadoopBinary() {
+ String val = findHadoopHome();
+ // if can't find hadoop home we can at least try /usr/bin/hadoop
+ val = (val == null ? File.separator + "usr" : val)
+ + File.separator + "bin" + File.separator + "hadoop";
+ // Launch hadoop command file on windows.
+ return val + (Shell.WINDOWS ? ".cmd" : "");
+ }
+
+ private static String findYarnBinary() {
+ String val = findHadoopHome();
+ val = (val == null ? "yarn" : val + File.separator + "bin" + File.separator + "yarn");
+ return val + (Shell.WINDOWS ? ".cmd" : "");
+ }
+
+ private static String findHadoopHome() {
String val = System.getenv("HADOOP_HOME");
// In Hadoop 1.X and Hadoop 2.X HADOOP_HOME is gone and replaced with HADOOP_PREFIX
if (val == null) {
val = System.getenv("HADOOP_PREFIX");
}
- // and if all else fails we can at least try /usr/bin/hadoop
- val = (val == null ? File.separator + "usr" : val)
- + File.separator + "bin" + File.separator + "hadoop";
- // Launch hadoop command file on windows.
- return val + (Shell.WINDOWS ? ".cmd" : "");
+ return val;
}
public String getDefaultValue() {
@@ -2916,7 +2933,8 @@ public boolean isHiddenConfig(String name) {
private boolean isSparkRelatedConfig(String name) {
boolean result = false;
if (name.startsWith("spark")) { // Spark property.
- result = true;
+ // for now we don't support changing spark app name on the fly
+ result = !name.equals("spark.app.name");
} else if (name.startsWith("yarn")) { // YARN property in Spark on YARN mode.
String sparkMaster = get("spark.master");
if (sparkMaster != null &&
diff --git a/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java b/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java
new file mode 100644
index 0000000..b87de09
--- /dev/null
+++ b/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.contrib.genericudf.example;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * GenericUDFAbs.
+ *
+ */
+@Description(name = "add10",
+ value = "_FUNC_(x) - returns 10 plus the original value of x",
+ extended = "Example:\n"
+ + " > SELECT _FUNC_(0) FROM src LIMIT 1;\n"
+ + " 10\n"
+ + " > SELECT _FUNC_(-5) FROM src LIMIT 1;\n" + " 5")
+public class GenericUDFAdd10 extends GenericUDF {
+ private transient PrimitiveCategory inputType;
+ private final DoubleWritable resultDouble = new DoubleWritable();
+ private final LongWritable resultLong = new LongWritable();
+ private final IntWritable resultInt = new IntWritable();
+ private final HiveDecimalWritable resultDecimal = new HiveDecimalWritable();
+ private transient PrimitiveObjectInspector argumentOI;
+ private transient Converter inputConverter;
+
+ @Override
+ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
+ if (arguments.length != 1) {
+ throw new UDFArgumentLengthException(
+ "ADD10() requires 1 argument, got " + arguments.length);
+ }
+
+ if (arguments[0].getCategory() != Category.PRIMITIVE) {
+ throw new UDFArgumentException(
+ "ADD10 only takes primitive types, got " + arguments[0].getTypeName());
+ }
+ argumentOI = (PrimitiveObjectInspector) arguments[0];
+
+ inputType = argumentOI.getPrimitiveCategory();
+ ObjectInspector outputOI = null;
+ switch (inputType) {
+ case SHORT:
+ case BYTE:
+ case INT:
+ inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+ PrimitiveObjectInspectorFactory.writableIntObjectInspector);
+ outputOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;
+ break;
+ case LONG:
+ inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+ PrimitiveObjectInspectorFactory.writableLongObjectInspector);
+ outputOI = PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ break;
+ case FLOAT:
+ case STRING:
+ case DOUBLE:
+ inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+ PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+ outputOI = PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+ break;
+ case DECIMAL:
+ outputOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+ ((PrimitiveObjectInspector) arguments[0]).getTypeInfo());
+ inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+ outputOI);
+ break;
+ default:
+ throw new UDFArgumentException(
+ "ADD10 only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType);
+ }
+ return outputOI;
+ }
+
+ @Override
+ public Object evaluate(DeferredObject[] arguments) throws HiveException {
+ Object valObject = arguments[0].get();
+ if (valObject == null) {
+ return null;
+ }
+ switch (inputType) {
+ case SHORT:
+ case BYTE:
+ case INT:
+ valObject = inputConverter.convert(valObject);
+ resultInt.set(10 + ((IntWritable) valObject).get());
+ return resultInt;
+ case LONG:
+ valObject = inputConverter.convert(valObject);
+ resultLong.set(10 + ((LongWritable) valObject).get());
+ return resultLong;
+ case FLOAT:
+ case STRING:
+ case DOUBLE:
+ valObject = inputConverter.convert(valObject);
+ resultDouble.set(10.0 + ((DoubleWritable) valObject).get());
+ return resultDouble;
+ case DECIMAL:
+ HiveDecimalObjectInspector decimalOI =
+ (HiveDecimalObjectInspector) argumentOI;
+ HiveDecimalWritable val = decimalOI.getPrimitiveWritableObject(valObject);
+
+ if (val != null) {
+ resultDecimal.set(val.getHiveDecimal().add(HiveDecimal.create("10")));
+ val = resultDecimal;
+ }
+ return val;
+ default:
+ throw new UDFArgumentException(
+ "ADD10 only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType);
+ }
+ }
+
+ @Override
+ public String getDisplayString(String[] children) {
+ return getStandardDisplayString("add10", children);
+ }
+
+}
diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml
index aef7877..f1d9ddc 100644
--- a/data/conf/spark/yarn-client/hive-site.xml
+++ b/data/conf/spark/yarn-client/hive-site.xml
@@ -196,7 +196,7 @@
spark.master
- yarn-client
+ yarn-cluster
@@ -231,7 +231,22 @@
spark.executor.memory
- 512m
+ 1g
+
+
+
+ spark.yarn.executor.memoryOverhead
+ 0
+
+
+
+ spark.driver.memory
+ 1g
+
+
+
+ spark.yarn.driver.memoryOverhead
+ 0
@@ -255,4 +270,9 @@
Internal marker for test. Used for masking env-dependent values
+
+ hive.spark.client.connect.timeout
+ 30000ms
+
+
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 8318c3a..ec6a2c7 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1091,8 +1091,6 @@ spark.query.files=add_part_multiple.q, \
stats7.q, \
stats8.q, \
stats9.q, \
- stats_counter.q, \
- stats_counter_partitioned.q, \
stats_noscan_1.q, \
stats_noscan_2.q, \
stats_only_null.q, \
@@ -1262,6 +1260,7 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
empty_dir_in_table.q,\
external_table_with_space_in_location_path.q,\
file_with_header_footer.q,\
+ gen_udf_example_add10.q,\
import_exported_table.q,\
index_bitmap3.q,\
index_bitmap_auto.q,\
@@ -1297,8 +1296,6 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
schemeAuthority2.q,\
scriptfile1.q,\
scriptfile1_win.q,\
- stats_counter.q,\
- stats_counter_partitioned.q,\
temp_table_external.q,\
truncate_column_buckets.q,\
uber_reduce.q,\
diff --git a/pom.xml b/pom.xml
index 2066518..2d2a3de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,7 +166,7 @@
4.0.4
0.8.2
2.2.0
- 1.5.0
+ 1.6.0
2.10
2.10.4
1.1
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 3289cfc..a659f7e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -440,13 +440,7 @@ protected void initializeOp(Configuration hconf) throws HiveException {
cntr = 1;
logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS);
- String suffix = Integer.toString(conf.getDestTableId());
- String fullName = conf.getTableInfo().getTableName();
- if (fullName != null) {
- suffix = suffix + "_" + fullName.toLowerCase();
- }
-
- statsMap.put(Counter.RECORDS_OUT + "_" + suffix, row_count);
+ statsMap.put(getCounterName(Counter.RECORDS_OUT), row_count);
} catch (HiveException e) {
throw e;
} catch (Exception e) {
@@ -455,6 +449,15 @@ protected void initializeOp(Configuration hconf) throws HiveException {
}
}
+ public String getCounterName(Counter counter) {
+ String suffix = Integer.toString(conf.getDestTableId());
+ String fullName = conf.getTableInfo().getTableName();
+ if (fullName != null) {
+ suffix = suffix + "_" + fullName.toLowerCase();
+ }
+ return counter + "_" + suffix;
+ }
+
private void logOutputFormatError(Configuration hconf, HiveException ex) {
StringWriter errorWriter = new StringWriter();
errorWriter.append("Failed to create output format; configuration: ");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 74b4802..e692460 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -170,11 +170,7 @@ protected void initializeOp(Configuration hconf) throws HiveException {
cntr = 1;
logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS);
- String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
- if (context != null && !context.isEmpty()) {
- context = "_" + context.replace(" ","_");
- }
- statsMap.put(Counter.RECORDS_OUT_INTERMEDIATE + context, recordCounter);
+ statsMap.put(getCounterName(Counter.RECORDS_OUT_INTERMEDIATE, hconf), recordCounter);
List keys = conf.getKeyCols();
@@ -256,6 +252,14 @@ protected void initializeOp(Configuration hconf) throws HiveException {
}
}
+ public String getCounterName(Counter counter, Configuration hconf) {
+ String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+ if (context != null && !context.isEmpty()) {
+ context = "_" + context.replace(" ", "_");
+ }
+ return counter + context;
+ }
+
/**
* Initializes array of ExprNodeEvaluator. Adds Union field for distinct
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 259c12f..993d02b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -27,8 +27,10 @@
import java.util.Set;
import org.apache.commons.compress.utils.CharsetNames;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -66,10 +68,16 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex
public static Map initiateSparkConf(HiveConf hiveConf) {
Map sparkConf = new HashMap();
+ HBaseConfiguration.addHbaseResources(hiveConf);
// set default spark configurations.
sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
- sparkConf.put("spark.app.name", SPARK_DEFAULT_APP_NAME);
+ final String appNameKey = "spark.app.name";
+ String appName = hiveConf.get(appNameKey);
+ if (appName == null) {
+ appName = SPARK_DEFAULT_APP_NAME;
+ }
+ sparkConf.put(appNameKey, appName);
sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER);
sparkConf.put("spark.kryo.referenceTracking", SPARK_DEFAULT_REFERENCE_TRACKING);
@@ -133,7 +141,21 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex
LOG.info(String.format(
"load yarn property from hive configuration in %s mode (%s -> %s).",
sparkMaster, propertyName, value));
+ } else if (propertyName.equals(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)) {
+ String value = hiveConf.get(propertyName);
+ if (value != null && !value.isEmpty()) {
+ sparkConf.put("spark.hadoop." + propertyName, value);
+ }
+ } else if (propertyName.startsWith("hbase")) {
+ // Add HBase related configuration to Spark because in security mode, Spark needs it
+ // to generate hbase delegation token for Spark. This is a temp solution to deal with
+ // Spark problem.
+ String value = hiveConf.get(propertyName);
+ sparkConf.put("spark.hadoop." + propertyName, value);
+ LOG.info(String.format(
+ "load HBase configuration (%s -> %s).", propertyName, value));
}
+
if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) {
String value = RpcConfiguration.getValue(hiveConf, propertyName);
sparkConf.put(propertyName, value);
@@ -152,6 +174,15 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex
classes.add(HiveKey.class.getName());
sparkConf.put("spark.kryo.classesToRegister", Joiner.on(",").join(classes));
+ // set yarn queue name
+ final String sparkQueueNameKey = "spark.yarn.queue";
+ if (sparkMaster.startsWith("yarn") && hiveConf.get(sparkQueueNameKey) == null) {
+ String queueName = hiveConf.get("mapreduce.job.queuename");
+ if (queueName != null) {
+ sparkConf.put(sparkQueueNameKey, queueName);
+ }
+ }
+
return sparkConf;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 6380774..11e7116 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -297,8 +297,9 @@ public Serializable call(JobContext jc) throws Exception {
// may need to load classes from this jar in other threads.
Map addedJars = jc.getAddedJars();
if (addedJars != null && !addedJars.isEmpty()) {
- SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir());
- localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars.keySet(), ";"));
+ List localAddedJars = SparkClientUtilities.addToClassPath(addedJars,
+ localJobConf, jc.getLocalTmpDir());
+ localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(localAddedJars, ";"));
}
Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index eb93aca..26cce1b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.hive.ql.exec.spark;
import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -30,11 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
@@ -44,7 +38,6 @@
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.ScriptOperator;
-import org.apache.hadoop.hive.ql.exec.StatsTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
@@ -58,25 +51,15 @@
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.spark.counter.SparkCounters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
@@ -86,7 +69,6 @@
private static final LogHelper console = new LogHelper(LOG);
private final PerfLogger perfLogger = SessionState.getPerfLogger();
private static final long serialVersionUID = 1L;
- private SparkCounters sparkCounters;
@Override
public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext,
@@ -106,7 +88,7 @@ public int execute(DriverContext driverContext) {
sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager);
SparkWork sparkWork = getWork();
- sparkWork.setRequiredCounterPrefix(getCounterPrefixes());
+ sparkWork.setRequiredCounterPrefix(getOperatorCounters());
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
@@ -116,8 +98,6 @@ public int execute(DriverContext driverContext) {
rc = jobRef.monitorJob();
SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
if (rc == 0) {
- sparkCounters = sparkJobStatus.getCounter();
- // for RSC, we should get the counters after job has finished
SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
if (LOG.isInfoEnabled() && sparkStatistics != null) {
LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId()));
@@ -233,10 +213,6 @@ public String getName() {
return ((ReduceWork) children.get(0)).getReducer();
}
- public SparkCounters getSparkCounters() {
- return sparkCounters;
- }
-
/**
* Set the number of reducers for the spark work.
*/
@@ -250,126 +226,6 @@ private void printConfigInfo() throws IOException {
console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "=");
}
- private Map> getCounterPrefixes() throws HiveException, MetaException {
- Map> counters = getOperatorCounters();
- StatsTask statsTask = getStatsTaskInChildTasks(this);
- String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
- // fetch table prefix if SparkTask try to gather table statistics based on counter.
- if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) {
- List prefixes = getRequiredCounterPrefix(statsTask);
- for (String prefix : prefixes) {
- List counterGroup = counters.get(prefix);
- if (counterGroup == null) {
- counterGroup = new LinkedList();
- counters.put(prefix, counterGroup);
- }
- counterGroup.add(StatsSetupConst.ROW_COUNT);
- counterGroup.add(StatsSetupConst.RAW_DATA_SIZE);
- }
- }
- return counters;
- }
-
- private List getRequiredCounterPrefix(StatsTask statsTask) throws HiveException, MetaException {
- List prefixs = new LinkedList();
- StatsWork statsWork = statsTask.getWork();
- String tablePrefix = getTablePrefix(statsWork);
- List