diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 0af7644..c605488 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1076,8 +1076,6 @@ spark.query.files=add_part_multiple.q, \ stats7.q, \ stats8.q, \ stats9.q, \ - stats_counter.q, \ - stats_counter_partitioned.q, \ stats_noscan_1.q, \ stats_noscan_2.q, \ stats_only_null.q, \ @@ -1282,8 +1280,6 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\ schemeAuthority2.q,\ scriptfile1.q,\ scriptfile1_win.q,\ - stats_counter.q,\ - stats_counter_partitioned.q,\ temp_table_external.q,\ truncate_column_buckets.q,\ uber_reduce.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index d215873..ec0fdea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.commons.compress.utils.CharsetNames; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -133,7 +134,7 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex LOG.info(String.format( "load yarn property from hive configuration in %s mode (%s -> %s).", sparkMaster, propertyName, value)); - } else if (propertyName.equals(HiveConf.ConfVars.HADOOPFS.varname)) { + } else if (propertyName.equals(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)) { String value = hiveConf.get(propertyName); if (value != null && !value.isEmpty()) { sparkConf.put("spark.hadoop." + propertyName, value); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index eaeffee..6e13f7c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hive.ql.exec.spark; import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -30,10 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -42,7 +37,6 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.ScriptOperator; -import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic; @@ -56,25 +50,15 @@ import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; -import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkWork; -import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.util.StringUtils; -import org.apache.hive.spark.counter.SparkCounters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; @@ -84,7 +68,6 @@ private static final LogHelper console = new LogHelper(LOG); private final PerfLogger perfLogger = SessionState.getPerfLogger(); private static final long serialVersionUID = 1L; - private SparkCounters sparkCounters; @Override public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) { @@ -103,7 +86,7 @@ public int execute(DriverContext driverContext) { sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager); SparkWork sparkWork = getWork(); - sparkWork.setRequiredCounterPrefix(getCounterPrefixes()); + sparkWork.setRequiredCounterPrefix(getOperatorCounters()); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork); @@ -113,8 +96,6 @@ public int execute(DriverContext driverContext) { rc = jobRef.monitorJob(); SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); if (rc == 0) { - sparkCounters = sparkJobStatus.getCounter(); - // for RSC, we should get the counters after job has finished SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics(); if (LOG.isInfoEnabled() && sparkStatistics != null) { LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId())); @@ -230,10 +211,6 @@ public String getName() { return ((ReduceWork) children.get(0)).getReducer(); } - public SparkCounters getSparkCounters() { - return sparkCounters; - } - /** * Set the number of reducers for the spark work. */ @@ -247,127 +224,6 @@ private void printConfigInfo() throws IOException { console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "="); } - private Map> getCounterPrefixes() throws HiveException, MetaException { - Map> counters = getOperatorCounters(); - StatsTask statsTask = getStatsTaskInChildTasks(this); - String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS); - // fetch table prefix if SparkTask try to gather table statistics based on counter. - if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) { - List prefixes = getRequiredCounterPrefix(statsTask); - for (String prefix : prefixes) { - List counterGroup = counters.get(prefix); - if (counterGroup == null) { - counterGroup = new LinkedList(); - counters.put(prefix, counterGroup); - } - counterGroup.add(StatsSetupConst.ROW_COUNT); - counterGroup.add(StatsSetupConst.RAW_DATA_SIZE); - } - } - return counters; - } - - private List getRequiredCounterPrefix(StatsTask statsTask) throws HiveException, MetaException { - List prefixs = new LinkedList(); - StatsWork statsWork = statsTask.getWork(); - String tablePrefix = getTablePrefix(statsWork); - List> partitionSpecs = getPartitionSpecs(statsWork); - int maxPrefixLength = StatsFactory.getMaxPrefixLength(conf); - - if (partitionSpecs == null) { - prefixs.add(Utilities.getHashedStatsPrefix(tablePrefix, maxPrefixLength)); - } else { - for (Map partitionSpec : partitionSpecs) { - String prefixWithPartition = Utilities.join(tablePrefix, Warehouse.makePartPath(partitionSpec)); - prefixs.add(Utilities.getHashedStatsPrefix(prefixWithPartition, maxPrefixLength)); - } - } - - return prefixs; - } - - private String getTablePrefix(StatsWork work) throws HiveException { - String tableName; - if (work.getLoadTableDesc() != null) { - tableName = work.getLoadTableDesc().getTable().getTableName(); - } else if (work.getTableSpecs() != null) { - tableName = work.getTableSpecs().tableName; - } else { - tableName = work.getLoadFileDesc().getDestinationCreateTable(); - } - Table table; - try { - table = db.getTable(tableName); - } catch (HiveException e) { - LOG.warn("Failed to get table:" + tableName); - // For CTAS query, table does not exist in this period, just use table name as prefix. - return tableName.toLowerCase(); - } - return table.getDbName() + "." + table.getTableName(); - } - - private static StatsTask getStatsTaskInChildTasks(Task rootTask) { - - List> childTasks = rootTask.getChildTasks(); - if (childTasks == null) { - return null; - } - for (Task task : childTasks) { - if (task instanceof StatsTask) { - return (StatsTask) task; - } else { - Task childTask = getStatsTaskInChildTasks(task); - if (childTask instanceof StatsTask) { - return (StatsTask) childTask; - } else { - continue; - } - } - } - - return null; - } - - private List> getPartitionSpecs(StatsWork work) throws HiveException { - if (work.getLoadFileDesc() != null) { - return null; //we are in CTAS, so we know there are no partitions - } - Table table; - List> partitionSpecs = new ArrayList>(); - - if (work.getTableSpecs() != null) { - - // ANALYZE command - TableSpec tblSpec = work.getTableSpecs(); - table = tblSpec.tableHandle; - if (!table.isPartitioned()) { - return null; - } - // get all partitions that matches with the partition spec - List partitions = tblSpec.partitions; - if (partitions != null) { - for (Partition partition : partitions) { - partitionSpecs.add(partition.getSpec()); - } - } - } else if (work.getLoadTableDesc() != null) { - - // INSERT OVERWRITE command - LoadTableDesc tbd = work.getLoadTableDesc(); - table = db.getTable(tbd.getTable().getTableName()); - if (!table.isPartitioned()) { - return null; - } - DynamicPartitionCtx dpCtx = tbd.getDPCtx(); - if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions - // we could not get dynamic partition information before SparkTask execution. - } else { // static partition - partitionSpecs.add(tbd.getPartitionSpec()); - } - } - return partitionSpecs; - } - private Map> getOperatorCounters() { String groupName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP); Map> counters = new HashMap>(); diff --git a/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out index 984554d..cab2ec8 100644 --- a/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out +++ b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out @@ -43,6 +43,7 @@ STAGE PLANS: key expressions: _col0 (type: int), _col1 (type: double) sort order: -+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.1 Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: double) diff --git a/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out index 05ec1f5..493d0a4 100644 --- a/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out +++ b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out @@ -48,6 +48,7 @@ STAGE PLANS: key expressions: _col0 (type: int), _col1 (type: double) sort order: -+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.1 Reducer 2 Reduce Operator Tree: Select Operator