diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a78b72f..84ee78f 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -642,6 +642,7 @@
// higher compute cost.
HIVE_STATS_NDV_ERROR("hive.stats.ndv.error", (float)20.0),
HIVE_STATS_KEY_PREFIX_MAX_LENGTH("hive.stats.key.prefix.max.length", 150),
+ HIVE_STATS_KEY_PREFIX_RESERVE_LENGTH("hive.stats.key.prefix.reserve.length", 24),
HIVE_STATS_KEY_PREFIX("hive.stats.key.prefix", ""), // internal usage only
// if length of variable length data type cannot be determined this length will be used.
HIVE_STATS_MAX_VARIABLE_LENGTH("hive.stats.max.variable.length", 100),
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index 7cd8a1f..24f61f3 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -1307,6 +1307,16 @@
exceeds a certain length, a hash of the key is used instead. If the value < 0 then hashing
is never used, if the value >= 0 then hashing is used only when the key prefixes length
exceeds that value. The key prefix is defined as everything preceding the task ID in the key.
+ For counter type stats, it's maxed by mapreduce.job.counters.group.name.max, which is by default 128.
+
+
+
+
+ hive.stats.key.prefix.reserve.length
+ 24
+
+ Reserved length for postfix of stats key. Only meaningfull for counter type stat db and
+ should be enough for the length of LB spec.
@@ -2127,7 +2137,7 @@
hive.metastore.integral.jdo.pushdown
- false
+ false
Allow JDO query pushdown for integral partition columns in metastore. Off by default. This
improves metastore perf for integral columns, especially if there's a large number of partitions.
diff --git data/conf/hive-site.xml data/conf/hive-site.xml
index eac1a3f..6a9d6f7 100644
--- data/conf/hive-site.xml
+++ data/conf/hive-site.xml
@@ -185,4 +185,10 @@
jdbc:derby
The default storatge that stores temporary hive statistics. Currently, jdbc, hbase and counter type is supported
+
+
+ hive.stats.key.prefix.reserve.length
+ 0
+
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index bd95161..5af1ec6 100644
--- ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1289,6 +1289,8 @@ public int execute() throws CommandNeedRetryException {
Map running = new HashMap();
DriverContext driverCxt = new DriverContext(runnable, ctx);
+ driverCxt.prepare(plan);
+
ctx.setHDFSCleanup(true);
SessionState.get().setLastMapRedStatsList(new ArrayList());
@@ -1368,6 +1370,8 @@ public int execute() throws CommandNeedRetryException {
}
}
+ driverCxt.finished(tskRun);
+
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setTaskProperty(queryId, tsk.getId(),
Keys.TASK_RET_CODE, String.valueOf(exitVal));
@@ -1529,6 +1533,8 @@ public void launchTask(Task extends Serializable> tsk, String queryId, boolean
TaskResult tskRes = new TaskResult();
TaskRunner tskRun = new TaskRunner(tsk, tskRes);
+ cxt.prepare(tskRun);
+
// Launch Task
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) {
// Launch it in the parallel mode, as a separate thread only for MR tasks
diff --git ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
index 1c84523..c51a9c8 100644
--- ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
+++ ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
@@ -18,13 +18,25 @@
package org.apache.hadoop.hive.ql;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.NodeUtils;
+import org.apache.hadoop.hive.ql.exec.NodeUtils.Function;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.StatsTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.Queue;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.mapred.JobConf;
-
/**
* DriverContext.
*
@@ -38,6 +50,8 @@
Context ctx;
+ final Map statsTasks = new HashMap(1);
+
public DriverContext() {
this.runnable = null;
this.ctx = null;
@@ -82,5 +96,42 @@ public Context getCtx() {
public void incCurJobNo(int amount) {
this.curJobNo = this.curJobNo + amount;
}
-
+
+ public void prepare(QueryPlan plan) {
+ // extract stats keys from StatsTask
+ List> rootTasks = plan.getRootTasks();
+ NodeUtils.iterateTask(rootTasks, StatsTask.class, new Function() {
+ public void apply(StatsTask statsTask) {
+ statsTasks.put(statsTask.getWork().getAggKey(), statsTask);
+ }
+ });
+ }
+
+ public void prepare(TaskRunner runner) {
+ }
+
+ public void finished(TaskRunner runner) {
+ if (statsTasks.isEmpty() || !(runner.getTask() instanceof MapRedTask)) {
+ return;
+ }
+ MapRedTask mapredTask = (MapRedTask) runner.getTask();
+
+ MapWork mapWork = mapredTask.getWork().getMapWork();
+ ReduceWork reduceWork = mapredTask.getWork().getReduceWork();
+ List operators = new ArrayList(mapWork.getAliasToWork().values());
+ if (reduceWork != null) {
+ operators.add(reduceWork.getReducer());
+ }
+ final List statKeys = new ArrayList(1);
+ NodeUtils.iterate(operators, FileSinkOperator.class, new Function() {
+ public void apply(FileSinkOperator fsOp) {
+ if (fsOp.getConf().isGatherStats()) {
+ statKeys.add(fsOp.getConf().getStatsAggPrefix());
+ }
+ }
+ });
+ for (String statKey : statKeys) {
+ statsTasks.get(statKey).getWork().setSourceTask(mapredTask);
+ }
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index b6c09eb..fb0b772 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -924,7 +924,7 @@ private void publishStats() throws HiveException {
postfix = Utilities.join(lbSpec, taskID);
}
prefix = Utilities.join(prefix, spSpec, dpSpec);
- prefix = Utilities.getHashedStatsPrefix(prefix, maxKeyLength, postfix.length());
+ prefix = Utilities.getHashedStatsPrefix(prefix, maxKeyLength);
String key = Utilities.join(prefix, postfix);
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java
new file mode 100644
index 0000000..5aae311
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.lib.Node;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Simple node iterating utils
+ */
+public class NodeUtils {
+
+ public static void iterateTask(Collection> tasks, Class clazz, Function function) {
+ Set visited = new HashSet();
+ for (Task> task : tasks) {
+ iterateTask(task, clazz, function, visited);
+ }
+ return;
+ }
+
+ private static void iterateTask(Task> task, Class clazz, Function function, Set visited) {
+ if (!visited.add(task)) {
+ return;
+ }
+ if (clazz.isInstance(task)) {
+ function.apply(clazz.cast(task));
+ }
+ // this is for ConditionalTask
+ if (task.getDependentTasks() != null) {
+ for (Task> dependent : task.getDependentTasks()) {
+ iterateTask(dependent, clazz, function, visited);
+ }
+ }
+ }
+
+ public static void iterate(Collection extends Node> nodes, Class clazz, Function function) {
+ Set visited = new HashSet();
+ for (Node task : nodes) {
+ iterate(task, clazz, function, visited);
+ }
+ return;
+ }
+
+ private static void iterate(Node node, Class clazz, Function function, Set visited) {
+ if (!visited.add(node)) {
+ return;
+ }
+ if (clazz.isInstance(node)) {
+ function.apply(clazz.cast(node));
+ }
+ if (node.getChildren() != null) {
+ for (Node child : node.getChildren()) {
+ iterate(child, clazz, function, visited);
+ }
+ }
+ }
+
+ public static interface Function {
+ void apply(T argument);
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
index a22a4c2..5f6b100 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
@@ -71,7 +71,7 @@ public StatsTask() {
}
@Override
- protected void receiveFeed(FeedType feedType, Object feedValue) {
+ public void receiveFeed(FeedType feedType, Object feedValue) {
// this method should be called by MoveTask when there are dynamic partitions generated
if (feedType == FeedType.DYNAMIC_PARTITIONS) {
assert feedValue instanceof List>;
@@ -171,9 +171,7 @@ private int aggregateStats() {
if (statsAggregator != null) {
String prefix = getAggregationPrefix(counterStat, table, null);
- String aggKey = Utilities.getHashedStatsPrefix(prefix, maxPrefixLength, 0);
- updateStats(statsAggregator, parameters, aggKey, atomic);
- statsAggregator.cleanUp(aggKey);
+ updateStats(statsAggregator, parameters, prefix, maxPrefixLength, atomic);
}
updateQuickStats(wh, parameters, tTable.getSd());
@@ -207,9 +205,7 @@ private int aggregateStats() {
if (statsAggregator != null) {
String prefix = getAggregationPrefix(counterStat, table, partn);
- String aggKey = Utilities.getHashedStatsPrefix(prefix, maxPrefixLength, 0);
- updateStats(statsAggregator, parameters, aggKey, atomic);
- statsAggregator.cleanUp(aggKey);
+ updateStats(statsAggregator, parameters, prefix, maxPrefixLength, atomic);
}
updateQuickStats(wh, parameters, tPart.getSd());
@@ -296,7 +292,10 @@ private boolean existStats(Map parameters) {
}
private void updateStats(StatsAggregator statsAggregator,
- Map parameters, String aggKey, boolean atomic) throws HiveException {
+ Map parameters, String prefix, int maxPrefixLength, boolean atomic)
+ throws HiveException {
+
+ String aggKey = Utilities.getHashedStatsPrefix(prefix, maxPrefixLength);
for (String statType : StatsSetupConst.statsRequireCompute) {
String value = statsAggregator.aggregateStats(aggKey, statType);
@@ -317,6 +316,7 @@ private void updateStats(StatsAggregator statsAggregator,
}
}
}
+ statsAggregator.cleanUp(aggKey);
}
private void updateQuickStats(Warehouse wh, Map parameters,
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
index 0e3cfe7..cec5f43 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
@@ -287,18 +287,17 @@ private void publishStats() throws HiveException {
String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(hconf));
Map statsToPublish = new HashMap();
+ boolean counterStats = statsPublisher instanceof CounterStatsPublisher;
+
for (String pspecs : stats.keySet()) {
statsToPublish.clear();
String prefix = Utilities.join(conf.getStatsAggPrefix(), pspecs);
- String key;
int maxKeyLength = conf.getMaxStatsKeyPrefixLength();
- if (statsPublisher instanceof CounterStatsPublisher) {
- key = Utilities.getHashedStatsPrefix(prefix, maxKeyLength, 0);
- } else {
+ String key = Utilities.getHashedStatsPrefix(prefix, maxKeyLength);
+ if (!(statsPublisher instanceof CounterStatsPublisher)) {
// stats publisher except counter type needs postfix 'taskID'
- prefix = Utilities.getHashedStatsPrefix(prefix, maxKeyLength, taskID.length());
- key = prefix + taskID;
+ key = Utilities.join(prefix, taskID);
}
for(String statType : stats.get(pspecs).getStoredStats()) {
statsToPublish.put(statType, Long.toString(stats.get(pspecs).getStat(statType)));
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index b9b5b4a..75c79c2 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -2401,12 +2401,11 @@ public static StatsPublisher getStatsPublisher(JobConf jc) {
* @param maxPrefixLength
* @return
*/
- public static String getHashedStatsPrefix(String statsPrefix,
- int maxPrefixLength, int postfixLength) {
+ public static String getHashedStatsPrefix(String statsPrefix, int maxPrefixLength) {
// todo: this might return possibly longer prefix than
// maxPrefixLength (if set) when maxPrefixLength - postfixLength < 17,
// which would make stat values invalid (especially for 'counter' type)
- if (maxPrefixLength >= 0 && statsPrefix.length() > maxPrefixLength - postfixLength) {
+ if (maxPrefixLength >= 0 && statsPrefix.length() > maxPrefixLength) {
try {
MessageDigest digester = MessageDigest.getInstance("MD5");
digester.update(statsPrefix.getBytes());
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
index e319fe4..12953af 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileKeyBufferWrapper;
import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileValueBufferWrapper;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.stats.CounterStatsPublisher;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.shims.CombineHiveKey;
@@ -139,11 +140,13 @@ private void publishStats() throws HiveException {
throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg());
}
+ int maxPrefixLength = StatsFactory.getMaxPrefixLength(jc);
// construct key used to store stats in intermediate db
- String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(jc));
- String keyPrefix = Utilities.getHashedStatsPrefix(
- statsAggKeyPrefix, StatsFactory.getMaxPrefixLength(jc), taskID.length());
- String key = keyPrefix + taskID;
+ String key = Utilities.getHashedStatsPrefix(statsAggKeyPrefix, maxPrefixLength);
+ if (!(statsPublisher instanceof CounterStatsPublisher)) {
+ String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(jc));
+ key = Utilities.join(key, taskID);
+ }
// construct statistics to be stored
Map statsToPublish = new HashMap();
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
index 0f0e825..66d4d4a 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
@@ -50,6 +50,9 @@
private boolean isPartialScanAnalyzeCommand = false;
+ // sourceTask for TS is not changed (currently) but that of FS might be changed
+ // by various optimizers (auto.convert.join, for example)
+ // so this is set by DriverContext in runtime
private transient Task sourceTask;
public StatsWork() {
diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java
index 2fb880d..3216cf3 100644
--- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java
+++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java
@@ -30,6 +30,7 @@
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESTATSDBCLASS;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_STATS_KEY_PREFIX_RESERVE_LENGTH;
/**
* A factory of stats publisher and aggregator implementations of the
@@ -51,6 +52,10 @@ public static int getMaxPrefixLength(Configuration conf) {
maxPrefixLength = maxPrefixLength < 0 ? groupNameMax :
Math.min(maxPrefixLength, groupNameMax);
}
+ if (maxPrefixLength > 0) {
+ int reserve = HiveConf.getIntVar(conf, HIVE_STATS_KEY_PREFIX_RESERVE_LENGTH);
+ return reserve < 0 ? maxPrefixLength : maxPrefixLength - reserve;
+ }
return maxPrefixLength;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
index 41e237f..384b49e 100644
--- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.ql.stats;
import java.util.List;