diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a78b72f..84ee78f 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -642,6 +642,7 @@ // higher compute cost. HIVE_STATS_NDV_ERROR("hive.stats.ndv.error", (float)20.0), HIVE_STATS_KEY_PREFIX_MAX_LENGTH("hive.stats.key.prefix.max.length", 150), + HIVE_STATS_KEY_PREFIX_RESERVE_LENGTH("hive.stats.key.prefix.reserve.length", 24), HIVE_STATS_KEY_PREFIX("hive.stats.key.prefix", ""), // internal usage only // if length of variable length data type cannot be determined this length will be used. HIVE_STATS_MAX_VARIABLE_LENGTH("hive.stats.max.variable.length", 100), diff --git conf/hive-default.xml.template conf/hive-default.xml.template index 7cd8a1f..24f61f3 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -1307,6 +1307,16 @@ exceeds a certain length, a hash of the key is used instead. If the value < 0 then hashing is never used, if the value >= 0 then hashing is used only when the key prefixes length exceeds that value. The key prefix is defined as everything preceding the task ID in the key. + For counter type stats, it's maxed by mapreduce.job.counters.group.name.max, which is by default 128. + + + + + hive.stats.key.prefix.reserve.length + 24 + + Reserved length for postfix of stats key. Only meaningfull for counter type stat db and + should be enough for the length of LB spec. @@ -2127,7 +2137,7 @@ hive.metastore.integral.jdo.pushdown - false + false Allow JDO query pushdown for integral partition columns in metastore. Off by default. This improves metastore perf for integral columns, especially if there's a large number of partitions. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index a314ce7..f317946 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Stack; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -63,7 +62,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; @@ -926,7 +924,7 @@ private void publishStats() throws HiveException { postfix = Utilities.join(lbSpec, taskID); } prefix = Utilities.join(prefix, spSpec, dpSpec); - prefix = Utilities.getHashedStatsPrefix(prefix, maxKeyLength, postfix.length()); + prefix = Utilities.getHashedStatsPrefix(prefix, maxKeyLength); String key = Utilities.join(prefix, postfix); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index a22a4c2..597358a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -171,9 +171,7 @@ private int aggregateStats() { if (statsAggregator != null) { String prefix = getAggregationPrefix(counterStat, table, null); - String aggKey = Utilities.getHashedStatsPrefix(prefix, maxPrefixLength, 0); - updateStats(statsAggregator, parameters, aggKey, atomic); - statsAggregator.cleanUp(aggKey); + updateStats(statsAggregator, parameters, prefix, maxPrefixLength, atomic); } updateQuickStats(wh, parameters, tTable.getSd()); @@ -207,9 +205,7 @@ private int aggregateStats() { if (statsAggregator != null) { String prefix = getAggregationPrefix(counterStat, table, partn); - String aggKey = Utilities.getHashedStatsPrefix(prefix, maxPrefixLength, 0); - updateStats(statsAggregator, parameters, aggKey, atomic); - statsAggregator.cleanUp(aggKey); + updateStats(statsAggregator, parameters, prefix, maxPrefixLength, atomic); } updateQuickStats(wh, parameters, tPart.getSd()); @@ -296,7 +292,10 @@ private boolean existStats(Map parameters) { } private void updateStats(StatsAggregator statsAggregator, - Map parameters, String aggKey, boolean atomic) throws HiveException { + Map parameters, String prefix, int maxPrefixLength, boolean atomic) + throws HiveException { + + String aggKey = Utilities.getHashedStatsPrefix(prefix, maxPrefixLength); for (String statType : StatsSetupConst.statsRequireCompute) { String value = statsAggregator.aggregateStats(aggKey, statType); @@ -317,6 +316,7 @@ private void updateStats(StatsAggregator statsAggregator, } } } + statsAggregator.cleanUp(aggKey); } private void updateQuickStats(Warehouse wh, Map parameters, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 0e3cfe7..cec5f43 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -287,18 +287,17 @@ private void publishStats() throws HiveException { String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(hconf)); Map statsToPublish = new HashMap(); + boolean counterStats = statsPublisher instanceof CounterStatsPublisher; + for (String pspecs : stats.keySet()) { statsToPublish.clear(); String prefix = Utilities.join(conf.getStatsAggPrefix(), pspecs); - String key; int maxKeyLength = conf.getMaxStatsKeyPrefixLength(); - if (statsPublisher instanceof CounterStatsPublisher) { - key = Utilities.getHashedStatsPrefix(prefix, maxKeyLength, 0); - } else { + String key = Utilities.getHashedStatsPrefix(prefix, maxKeyLength); + if (!(statsPublisher instanceof CounterStatsPublisher)) { // stats publisher except counter type needs postfix 'taskID' - prefix = Utilities.getHashedStatsPrefix(prefix, maxKeyLength, taskID.length()); - key = prefix + taskID; + key = Utilities.join(prefix, taskID); } for(String statType : stats.get(pspecs).getStoredStats()) { statsToPublish.put(statType, Long.toString(stats.get(pspecs).getStat(statType))); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 18e261a..e644d56 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2387,12 +2387,11 @@ public static StatsPublisher getStatsPublisher(JobConf jc) { * @param maxPrefixLength * @return */ - public static String getHashedStatsPrefix(String statsPrefix, - int maxPrefixLength, int postfixLength) { + public static String getHashedStatsPrefix(String statsPrefix, int maxPrefixLength) { // todo: this might return possibly longer prefix than // maxPrefixLength (if set) when maxPrefixLength - postfixLength < 17, // which would make stat values invalid (especially for 'counter' type) - if (maxPrefixLength >= 0 && statsPrefix.length() > maxPrefixLength - postfixLength) { + if (maxPrefixLength >= 0 && statsPrefix.length() > maxPrefixLength) { try { MessageDigest digester = MessageDigest.getInstance("MD5"); digester.update(statsPrefix.getBytes()); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java index e319fe4..12953af 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileKeyBufferWrapper; import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileValueBufferWrapper; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.stats.CounterStatsPublisher; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.shims.CombineHiveKey; @@ -139,11 +140,13 @@ private void publishStats() throws HiveException { throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg()); } + int maxPrefixLength = StatsFactory.getMaxPrefixLength(jc); // construct key used to store stats in intermediate db - String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(jc)); - String keyPrefix = Utilities.getHashedStatsPrefix( - statsAggKeyPrefix, StatsFactory.getMaxPrefixLength(jc), taskID.length()); - String key = keyPrefix + taskID; + String key = Utilities.getHashedStatsPrefix(statsAggKeyPrefix, maxPrefixLength); + if (!(statsPublisher instanceof CounterStatsPublisher)) { + String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(jc)); + key = Utilities.join(key, taskID); + } // construct statistics to be stored Map statsToPublish = new HashMap(); diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java index 2fb880d..3216cf3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java @@ -30,6 +30,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESTATSDBCLASS; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_STATS_KEY_PREFIX_RESERVE_LENGTH; /** * A factory of stats publisher and aggregator implementations of the @@ -51,6 +52,10 @@ public static int getMaxPrefixLength(Configuration conf) { maxPrefixLength = maxPrefixLength < 0 ? groupNameMax : Math.min(maxPrefixLength, groupNameMax); } + if (maxPrefixLength > 0) { + int reserve = HiveConf.getIntVar(conf, HIVE_STATS_KEY_PREFIX_RESERVE_LENGTH); + return reserve < 0 ? maxPrefixLength : maxPrefixLength - reserve; + } return maxPrefixLength; }