diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a78b72f..84ee78f 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -642,6 +642,7 @@ // higher compute cost. HIVE_STATS_NDV_ERROR("hive.stats.ndv.error", (float)20.0), HIVE_STATS_KEY_PREFIX_MAX_LENGTH("hive.stats.key.prefix.max.length", 150), + HIVE_STATS_KEY_PREFIX_RESERVE_LENGTH("hive.stats.key.prefix.reserve.length", 24), HIVE_STATS_KEY_PREFIX("hive.stats.key.prefix", ""), // internal usage only // if length of variable length data type cannot be determined this length will be used. HIVE_STATS_MAX_VARIABLE_LENGTH("hive.stats.max.variable.length", 100), diff --git conf/hive-default.xml.template conf/hive-default.xml.template index 7cd8a1f..24f61f3 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -1307,6 +1307,16 @@ exceeds a certain length, a hash of the key is used instead. If the value < 0 then hashing is never used, if the value >= 0 then hashing is used only when the key prefixes length exceeds that value. The key prefix is defined as everything preceding the task ID in the key. + For counter type stats, it's maxed by mapreduce.job.counters.group.name.max, which is by default 128. + + + + + hive.stats.key.prefix.reserve.length + 24 + + Reserved length for postfix of stats key. Only meaningfull for counter type stat db and + should be enough for the length of LB spec. @@ -2127,7 +2137,7 @@ hive.metastore.integral.jdo.pushdown - false + false Allow JDO query pushdown for integral partition columns in metastore. Off by default. This improves metastore perf for integral columns, especially if there's a large number of partitions. diff --git data/conf/hive-site.xml data/conf/hive-site.xml index eac1a3f..6a9d6f7 100644 --- data/conf/hive-site.xml +++ data/conf/hive-site.xml @@ -185,4 +185,10 @@ jdbc:derby The default storatge that stores temporary hive statistics. Currently, jdbc, hbase and counter type is supported + + + hive.stats.key.prefix.reserve.length + 0 + + diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index bd95161..5af1ec6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1289,6 +1289,8 @@ public int execute() throws CommandNeedRetryException { Map running = new HashMap(); DriverContext driverCxt = new DriverContext(runnable, ctx); + driverCxt.prepare(plan); + ctx.setHDFSCleanup(true); SessionState.get().setLastMapRedStatsList(new ArrayList()); @@ -1368,6 +1370,8 @@ public int execute() throws CommandNeedRetryException { } } + driverCxt.finished(tskRun); + if (SessionState.get() != null) { SessionState.get().getHiveHistory().setTaskProperty(queryId, tsk.getId(), Keys.TASK_RET_CODE, String.valueOf(exitVal)); @@ -1529,6 +1533,8 @@ public void launchTask(Task tsk, String queryId, boolean TaskResult tskRes = new TaskResult(); TaskRunner tskRun = new TaskRunner(tsk, tskRes); + cxt.prepare(tskRun); + // Launch Task if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) { // Launch it in the parallel mode, as a separate thread only for MR tasks diff --git ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java index 1c84523..c51a9c8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java @@ -18,13 +18,25 @@ package org.apache.hadoop.hive.ql; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.NodeUtils; +import org.apache.hadoop.hive.ql.exec.NodeUtils.Function; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; + import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.Queue; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.mapred.JobConf; - /** * DriverContext. * @@ -38,6 +50,8 @@ Context ctx; + final Map statsTasks = new HashMap(1); + public DriverContext() { this.runnable = null; this.ctx = null; @@ -82,5 +96,42 @@ public Context getCtx() { public void incCurJobNo(int amount) { this.curJobNo = this.curJobNo + amount; } - + + public void prepare(QueryPlan plan) { + // extract stats keys from StatsTask + List> rootTasks = plan.getRootTasks(); + NodeUtils.iterateTask(rootTasks, StatsTask.class, new Function() { + public void apply(StatsTask statsTask) { + statsTasks.put(statsTask.getWork().getAggKey(), statsTask); + } + }); + } + + public void prepare(TaskRunner runner) { + } + + public void finished(TaskRunner runner) { + if (statsTasks.isEmpty() || !(runner.getTask() instanceof MapRedTask)) { + return; + } + MapRedTask mapredTask = (MapRedTask) runner.getTask(); + + MapWork mapWork = mapredTask.getWork().getMapWork(); + ReduceWork reduceWork = mapredTask.getWork().getReduceWork(); + List operators = new ArrayList(mapWork.getAliasToWork().values()); + if (reduceWork != null) { + operators.add(reduceWork.getReducer()); + } + final List statKeys = new ArrayList(1); + NodeUtils.iterate(operators, FileSinkOperator.class, new Function() { + public void apply(FileSinkOperator fsOp) { + if (fsOp.getConf().isGatherStats()) { + statKeys.add(fsOp.getConf().getStatsAggPrefix()); + } + } + }); + for (String statKey : statKeys) { + statsTasks.get(statKey).getWork().setSourceTask(mapredTask); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index b6c09eb..fb0b772 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -924,7 +924,7 @@ private void publishStats() throws HiveException { postfix = Utilities.join(lbSpec, taskID); } prefix = Utilities.join(prefix, spSpec, dpSpec); - prefix = Utilities.getHashedStatsPrefix(prefix, maxKeyLength, postfix.length()); + prefix = Utilities.getHashedStatsPrefix(prefix, maxKeyLength); String key = Utilities.join(prefix, postfix); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java new file mode 100644 index 0000000..5aae311 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.lib.Node; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * Simple node iterating utils + */ +public class NodeUtils { + + public static void iterateTask(Collection> tasks, Class clazz, Function function) { + Set visited = new HashSet(); + for (Task task : tasks) { + iterateTask(task, clazz, function, visited); + } + return; + } + + private static void iterateTask(Task task, Class clazz, Function function, Set visited) { + if (!visited.add(task)) { + return; + } + if (clazz.isInstance(task)) { + function.apply(clazz.cast(task)); + } + // this is for ConditionalTask + if (task.getDependentTasks() != null) { + for (Task dependent : task.getDependentTasks()) { + iterateTask(dependent, clazz, function, visited); + } + } + } + + public static void iterate(Collection nodes, Class clazz, Function function) { + Set visited = new HashSet(); + for (Node task : nodes) { + iterate(task, clazz, function, visited); + } + return; + } + + private static void iterate(Node node, Class clazz, Function function, Set visited) { + if (!visited.add(node)) { + return; + } + if (clazz.isInstance(node)) { + function.apply(clazz.cast(node)); + } + if (node.getChildren() != null) { + for (Node child : node.getChildren()) { + iterate(child, clazz, function, visited); + } + } + } + + public static interface Function { + void apply(T argument); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index a22a4c2..5f6b100 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -71,7 +71,7 @@ public StatsTask() { } @Override - protected void receiveFeed(FeedType feedType, Object feedValue) { + public void receiveFeed(FeedType feedType, Object feedValue) { // this method should be called by MoveTask when there are dynamic partitions generated if (feedType == FeedType.DYNAMIC_PARTITIONS) { assert feedValue instanceof List; @@ -171,9 +171,7 @@ private int aggregateStats() { if (statsAggregator != null) { String prefix = getAggregationPrefix(counterStat, table, null); - String aggKey = Utilities.getHashedStatsPrefix(prefix, maxPrefixLength, 0); - updateStats(statsAggregator, parameters, aggKey, atomic); - statsAggregator.cleanUp(aggKey); + updateStats(statsAggregator, parameters, prefix, maxPrefixLength, atomic); } updateQuickStats(wh, parameters, tTable.getSd()); @@ -207,9 +205,7 @@ private int aggregateStats() { if (statsAggregator != null) { String prefix = getAggregationPrefix(counterStat, table, partn); - String aggKey = Utilities.getHashedStatsPrefix(prefix, maxPrefixLength, 0); - updateStats(statsAggregator, parameters, aggKey, atomic); - statsAggregator.cleanUp(aggKey); + updateStats(statsAggregator, parameters, prefix, maxPrefixLength, atomic); } updateQuickStats(wh, parameters, tPart.getSd()); @@ -296,7 +292,10 @@ private boolean existStats(Map parameters) { } private void updateStats(StatsAggregator statsAggregator, - Map parameters, String aggKey, boolean atomic) throws HiveException { + Map parameters, String prefix, int maxPrefixLength, boolean atomic) + throws HiveException { + + String aggKey = Utilities.getHashedStatsPrefix(prefix, maxPrefixLength); for (String statType : StatsSetupConst.statsRequireCompute) { String value = statsAggregator.aggregateStats(aggKey, statType); @@ -317,6 +316,7 @@ private void updateStats(StatsAggregator statsAggregator, } } } + statsAggregator.cleanUp(aggKey); } private void updateQuickStats(Warehouse wh, Map parameters, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 0e3cfe7..cec5f43 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -287,18 +287,17 @@ private void publishStats() throws HiveException { String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(hconf)); Map statsToPublish = new HashMap(); + boolean counterStats = statsPublisher instanceof CounterStatsPublisher; + for (String pspecs : stats.keySet()) { statsToPublish.clear(); String prefix = Utilities.join(conf.getStatsAggPrefix(), pspecs); - String key; int maxKeyLength = conf.getMaxStatsKeyPrefixLength(); - if (statsPublisher instanceof CounterStatsPublisher) { - key = Utilities.getHashedStatsPrefix(prefix, maxKeyLength, 0); - } else { + String key = Utilities.getHashedStatsPrefix(prefix, maxKeyLength); + if (!(statsPublisher instanceof CounterStatsPublisher)) { // stats publisher except counter type needs postfix 'taskID' - prefix = Utilities.getHashedStatsPrefix(prefix, maxKeyLength, taskID.length()); - key = prefix + taskID; + key = Utilities.join(prefix, taskID); } for(String statType : stats.get(pspecs).getStoredStats()) { statsToPublish.put(statType, Long.toString(stats.get(pspecs).getStat(statType))); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index b9b5b4a..75c79c2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2401,12 +2401,11 @@ public static StatsPublisher getStatsPublisher(JobConf jc) { * @param maxPrefixLength * @return */ - public static String getHashedStatsPrefix(String statsPrefix, - int maxPrefixLength, int postfixLength) { + public static String getHashedStatsPrefix(String statsPrefix, int maxPrefixLength) { // todo: this might return possibly longer prefix than // maxPrefixLength (if set) when maxPrefixLength - postfixLength < 17, // which would make stat values invalid (especially for 'counter' type) - if (maxPrefixLength >= 0 && statsPrefix.length() > maxPrefixLength - postfixLength) { + if (maxPrefixLength >= 0 && statsPrefix.length() > maxPrefixLength) { try { MessageDigest digester = MessageDigest.getInstance("MD5"); digester.update(statsPrefix.getBytes()); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java index e319fe4..12953af 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileKeyBufferWrapper; import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileValueBufferWrapper; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.stats.CounterStatsPublisher; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.shims.CombineHiveKey; @@ -139,11 +140,13 @@ private void publishStats() throws HiveException { throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg()); } + int maxPrefixLength = StatsFactory.getMaxPrefixLength(jc); // construct key used to store stats in intermediate db - String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(jc)); - String keyPrefix = Utilities.getHashedStatsPrefix( - statsAggKeyPrefix, StatsFactory.getMaxPrefixLength(jc), taskID.length()); - String key = keyPrefix + taskID; + String key = Utilities.getHashedStatsPrefix(statsAggKeyPrefix, maxPrefixLength); + if (!(statsPublisher instanceof CounterStatsPublisher)) { + String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(jc)); + key = Utilities.join(key, taskID); + } // construct statistics to be stored Map statsToPublish = new HashMap(); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java index 0f0e825..66d4d4a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java @@ -50,6 +50,9 @@ private boolean isPartialScanAnalyzeCommand = false; + // sourceTask for TS is not changed (currently) but that of FS might be changed + // by various optimizers (auto.convert.join, for example) + // so this is set by DriverContext in runtime private transient Task sourceTask; public StatsWork() { diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java index 2fb880d..3216cf3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java @@ -30,6 +30,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESTATSDBCLASS; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_STATS_KEY_PREFIX_RESERVE_LENGTH; /** * A factory of stats publisher and aggregator implementations of the @@ -51,6 +52,10 @@ public static int getMaxPrefixLength(Configuration conf) { maxPrefixLength = maxPrefixLength < 0 ? groupNameMax : Math.min(maxPrefixLength, groupNameMax); } + if (maxPrefixLength > 0) { + int reserve = HiveConf.getIntVar(conf, HIVE_STATS_KEY_PREFIX_RESERVE_LENGTH); + return reserve < 0 ? maxPrefixLength : maxPrefixLength - reserve; + } return maxPrefixLength; } diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java index 41e237f..384b49e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.stats; import java.util.List;