diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java index b3ceff1..e119dd8 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java @@ -650,7 +650,7 @@ public static void fillColumnStatisticsData(String colType, ColumnStatisticsData } } - private static Decimal createThriftDecimal(String s) { + public static Decimal createThriftDecimal(String s) { BigDecimal d = new BigDecimal(s); return new Decimal(ByteBuffer.wrap(d.unscaledValue().toByteArray()), (short)d.scale()); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java index 9ec7cd5..e0b449b 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.metastore.hbase; import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; @@ -88,7 +90,7 @@ /** * Utility functions */ -class HBaseUtils { +public class HBaseUtils { final static Charset ENCODING = StandardCharsets.UTF_8; final static char KEY_SEPARATOR = '\u0001'; @@ -1421,4 +1423,8 @@ static String deserializeMasterKey(byte[] value) throws InvalidProtocolBufferExc b[7] = (byte)(v >>> 0); return b; } + + public static double getDoubleValue(Decimal decimal) { + return new BigDecimal(new BigInteger(decimal.getUnscaled()), decimal.getScale()).doubleValue(); + } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java index f1d2e50..18f8afc 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java @@ -85,12 +85,12 @@ private StatsCache(final Configuration conf) { @Override public AggrStats load(StatsCacheKey key) throws Exception { int numBitVectors = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf); + boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION); HBaseReadWrite hrw = HBaseReadWrite.getInstance(); AggrStats aggrStats = hrw.getAggregatedStats(key.hashed); if (aggrStats == null) { misses.incr(); ColumnStatsAggregator aggregator = null; - ColumnStatisticsObj statsObj = null; aggrStats = new AggrStats(); LOG.debug("Unable to find aggregated stats for " + key.colName + ", aggregating"); List css = hrw.getPartitionStatistics(key.dbName, key.tableName, @@ -98,19 +98,13 @@ public AggrStats load(StatsCacheKey key) throws Exception { Collections.singletonList(key.colName)); if (css != null && css.size() > 0) { aggrStats.setPartsFound(css.size()); - for (ColumnStatistics cs : css) { - for (ColumnStatisticsObj cso : cs.getStatsObj()) { - if (statsObj == null) { - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(key.colName, - cso.getColType(), cso.getStatsData().getSetField()); - } - if (aggregator == null) { - aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator( - cso.getStatsData().getSetField(), numBitVectors); - } - aggregator.aggregate(statsObj, cso); - } + if (aggregator == null) { + aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(css.iterator() + .next().getStatsObj().iterator().next().getStatsData().getSetField(), + numBitVectors, useDensityFunctionForNDVEstimation); } + ColumnStatisticsObj statsObj = aggregator + .aggregate(key.colName, key.partNames, css); aggrStats.addToColStats(statsObj); me.put(key, aggrStats); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java index 40340dd..d81d612 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BinaryColumnStatsAggregator.java @@ -19,17 +19,46 @@ package org.apache.hadoop.hive.metastore.hbase.stats; +import java.util.List; + import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.MetaException; -public class BinaryColumnStatsAggregator extends ColumnStatsAggregator{ +public class BinaryColumnStatsAggregator extends ColumnStatsAggregator { @Override - public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) { - BinaryColumnStatsData aggregateData = aggregateColStats.getStatsData().getBinaryStats(); - BinaryColumnStatsData newData = newColStats.getStatsData().getBinaryStats(); - aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen())); - aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen())); - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + public ColumnStatisticsObj aggregate(String colName, List partNames, + List css) throws MetaException { + ColumnStatisticsObj statsObj = null; + BinaryColumnStatsData aggregateData = null; + String colType = null; + for (ColumnStatistics cs : css) { + if (cs.getStatsObjSize() != 1) { + throw new MetaException( + "The number of columns should be exactly one in aggrStats, but found " + + cs.getStatsObjSize()); + } + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + if (statsObj == null) { + colType = cso.getColType(); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso + .getStatsData().getSetField()); + } + BinaryColumnStatsData newData = cso.getStatsData().getBinaryStats(); + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen())); + aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen())); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + } + } + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); + columnStatisticsData.setBinaryStats(aggregateData); + statsObj.setStatsData(columnStatisticsData); + return statsObj; } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java index 735d965..e796df2 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/BooleanColumnStatsAggregator.java @@ -19,17 +19,47 @@ package org.apache.hadoop.hive.metastore.hbase.stats; +import java.util.List; + import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.MetaException; public class BooleanColumnStatsAggregator extends ColumnStatsAggregator { @Override - public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) { - BooleanColumnStatsData aggregateData = aggregateColStats.getStatsData().getBooleanStats(); - BooleanColumnStatsData newData = newColStats.getStatsData().getBooleanStats(); - aggregateData.setNumTrues(aggregateData.getNumTrues() + newData.getNumTrues()); - aggregateData.setNumFalses(aggregateData.getNumFalses() + newData.getNumFalses()); - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + public ColumnStatisticsObj aggregate(String colName, List partNames, + List css) throws MetaException { + ColumnStatisticsObj statsObj = null; + BooleanColumnStatsData aggregateData = null; + String colType = null; + for (ColumnStatistics cs : css) { + if (cs.getStatsObjSize() != 1) { + throw new MetaException( + "The number of columns should be exactly one in aggrStats, but found " + + cs.getStatsObjSize()); + } + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + if (statsObj == null) { + colType = cso.getColType(); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso + .getStatsData().getSetField()); + } + BooleanColumnStatsData newData = cso.getStatsData().getBooleanStats(); + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + aggregateData.setNumTrues(aggregateData.getNumTrues() + newData.getNumTrues()); + aggregateData.setNumFalses(aggregateData.getNumFalses() + newData.getNumFalses()); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + } + } + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); + columnStatisticsData.setBooleanStats(aggregateData); + statsObj.setStatsData(columnStatisticsData); + return statsObj; } + } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java index 694e53b..31955b4 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregator.java @@ -19,10 +19,16 @@ package org.apache.hadoop.hive.metastore.hbase.stats; -import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator; +import java.util.List; + +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.MetaException; public abstract class ColumnStatsAggregator { - NumDistinctValueEstimator ndvEstimator = null; - public abstract void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats); + public int numBitVectors; + public boolean useDensityFunctionForNDVEstimation; + + public abstract ColumnStatisticsObj aggregate(String colName, List partNames, + List css) throws MetaException; } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java index 8eb127b..daf8569 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.metastore.hbase.stats; -import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator; import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; @@ -35,7 +34,7 @@ private ColumnStatsAggregatorFactory() { } - public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type, int numBitVectors) { + public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type, int numBitVectors, boolean useDensityFunctionForNDVEstimation) { ColumnStatsAggregator agg; switch (type) { case BOOLEAN_STATS: @@ -59,9 +58,8 @@ public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type, int n default: throw new RuntimeException("Woh, bad. Unknown stats type " + type.toString()); } - if (numBitVectors > 0) { - agg.ndvEstimator = new NumDistinctValueEstimator(numBitVectors); - } + agg.numBitVectors = numBitVectors; + agg.useDensityFunctionForNDVEstimation = useDensityFunctionForNDVEstimation; return agg; } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java index 50f4325..36b2c9c 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DecimalColumnStatsAggregator.java @@ -19,33 +19,333 @@ package org.apache.hadoop.hive.metastore.hbase.stats; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator; +import org.apache.hadoop.hive.metastore.StatObjectConverter; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.api.Decimal; import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.hbase.HBaseUtils; -public class DecimalColumnStatsAggregator extends ColumnStatsAggregator { +public class DecimalColumnStatsAggregator extends ColumnStatsAggregator implements + IExtrapolatePartStatus { @Override - public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) { - DecimalColumnStatsData aggregateData = aggregateColStats.getStatsData().getDecimalStats(); - DecimalColumnStatsData newData = newColStats.getStatsData().getDecimalStats(); - Decimal lowValue = aggregateData.getLowValue() != null - && (aggregateData.getLowValue().compareTo(newData.getLowValue()) > 0) ? aggregateData - .getLowValue() : newData.getLowValue(); - aggregateData.setLowValue(lowValue); - Decimal highValue = aggregateData.getHighValue() != null - && (aggregateData.getHighValue().compareTo(newData.getHighValue()) > 0) ? aggregateData - .getHighValue() : newData.getHighValue(); - aggregateData.setHighValue(highValue); - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); - if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) { - aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); + public ColumnStatisticsObj aggregate(String colName, List partNames, + List css) throws MetaException { + ColumnStatisticsObj statsObj = null; + + // check if all the ColumnStatisticsObjs contain stats and all the ndv are + // bitvectors + boolean doAllPartitionContainStats = partNames.size() == css.size(); + boolean isNDVBitVectorSet = true; + String colType = null; + for (ColumnStatistics cs : css) { + if (cs.getStatsObjSize() != 1) { + throw new MetaException( + "The number of columns should be exactly one in aggrStats, but found " + + cs.getStatsObjSize()); + } + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + if (statsObj == null) { + colType = cso.getColType(); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso + .getStatsData().getSetField()); + } + if (numBitVectors <= 0 || !cso.getStatsData().getDecimalStats().isSetBitVectors() + || cso.getStatsData().getDecimalStats().getBitVectors().length() == 0) { + isNDVBitVectorSet = false; + break; + } + } + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); + if (doAllPartitionContainStats || css.size() < 2) { + DecimalColumnStatsData aggregateData = null; + long lowerBound = 0; + long higherBound = 0; + double densityAvgSum = 0.0; + NumDistinctValueEstimator ndvEstimator = null; + if (isNDVBitVectorSet) { + ndvEstimator = new NumDistinctValueEstimator(numBitVectors); + } + for (ColumnStatistics cs : css) { + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats(); + if (useDensityFunctionForNDVEstimation) { + lowerBound = Math.max(lowerBound, newData.getNumDVs()); + higherBound += newData.getNumDVs(); + densityAvgSum += (HBaseUtils.getDoubleValue(newData.getHighValue()) - HBaseUtils + .getDoubleValue(newData.getLowValue())) / newData.getNumDVs(); + } + if (isNDVBitVectorSet) { + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(), + ndvEstimator.getnumBitVectors())); + } + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + if (HBaseUtils.getDoubleValue(aggregateData.getLowValue()) < HBaseUtils + .getDoubleValue(newData.getLowValue())) { + aggregateData.setLowValue(aggregateData.getLowValue()); + } else { + aggregateData.setLowValue(newData.getLowValue()); + } + if (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) > HBaseUtils + .getDoubleValue(newData.getHighValue())) { + aggregateData.setHighValue(aggregateData.getHighValue()); + } else { + aggregateData.setHighValue(newData.getHighValue()); + } + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); + } + } + if (isNDVBitVectorSet) { + // if all the ColumnStatisticsObjs contain bitvectors, we do not need to + // use uniform distribution assumption because we can merge bitvectors + // to get a good estimation. + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); + } else { + if (useDensityFunctionForNDVEstimation) { + // We have estimation, lowerbound and higherbound. We use estimation + // if it is between lowerbound and higherbound. + double densityAvg = densityAvgSum / partNames.size(); + long estimation = (long) ((HBaseUtils.getDoubleValue(aggregateData.getHighValue()) - HBaseUtils + .getDoubleValue(aggregateData.getLowValue())) / densityAvg); + if (estimation < lowerBound) { + aggregateData.setNumDVs(lowerBound); + } else if (estimation > higherBound) { + aggregateData.setNumDVs(higherBound); + } else { + aggregateData.setNumDVs(estimation); + } + } else { + // Without useDensityFunctionForNDVEstimation, we just use the + // default one, which is the max of all the partitions and it is + // already done. + } + } + columnStatisticsData.setDecimalStats(aggregateData); + } else { + // we need extrapolation + Map indexMap = new HashMap(); + for (int index = 0; index < partNames.size(); index++) { + indexMap.put(partNames.get(index), index); + } + Map adjustedIndexMap = new HashMap(); + Map adjustedStatsMap = new HashMap(); + // while we scan the css, we also get the densityAvg, lowerbound and + // higerbound when useDensityFunctionForNDVEstimation is true. + double densityAvgSum = 0.0; + if (!isNDVBitVectorSet) { + // if not every partition uses bitvector for ndv, we just fall back to + // the traditional extrapolation methods. + for (ColumnStatistics cs : css) { + String partName = cs.getStatsDesc().getPartName(); + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats(); + if (useDensityFunctionForNDVEstimation) { + densityAvgSum += (HBaseUtils.getDoubleValue(newData.getHighValue()) - HBaseUtils + .getDoubleValue(newData.getLowValue())) / newData.getNumDVs(); + } + adjustedIndexMap.put(partName, (double) indexMap.get(partName)); + adjustedStatsMap.put(partName, cso.getStatsData()); + } + } else { + // we first merge all the adjacent bitvectors that we could merge and + // derive new partition names and index. + NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors); + StringBuilder pseudoPartName = new StringBuilder(); + double pseudoIndexSum = 0; + int length = 0; + int curIndex = -1; + DecimalColumnStatsData aggregateData = null; + for (ColumnStatistics cs : css) { + String partName = cs.getStatsDesc().getPartName(); + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats(); + // newData.isSetBitVectors() should be true for sure because we + // already checked it before. + if (indexMap.get(partName) != curIndex) { + // There is bitvector, but it is not adjacent to the previous ones. + if (length > 0) { + // we have to set ndv + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length); + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); + ColumnStatisticsData csd = new ColumnStatisticsData(); + csd.setDecimalStats(aggregateData); + adjustedStatsMap.put(pseudoPartName.toString(), csd); + if (useDensityFunctionForNDVEstimation) { + densityAvgSum += (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) - HBaseUtils + .getDoubleValue(aggregateData.getLowValue())) / aggregateData.getNumDVs(); + } + // reset everything + pseudoPartName = new StringBuilder(); + pseudoIndexSum = 0; + length = 0; + } + aggregateData = null; + } + curIndex = indexMap.get(partName); + pseudoPartName.append(partName); + pseudoIndexSum += curIndex; + length++; + curIndex++; + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + if (HBaseUtils.getDoubleValue(aggregateData.getLowValue()) < HBaseUtils + .getDoubleValue(newData.getLowValue())) { + aggregateData.setLowValue(aggregateData.getLowValue()); + } else { + aggregateData.setLowValue(newData.getLowValue()); + } + if (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) > HBaseUtils + .getDoubleValue(newData.getHighValue())) { + aggregateData.setHighValue(aggregateData.getHighValue()); + } else { + aggregateData.setHighValue(newData.getHighValue()); + } + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + } + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(), + ndvEstimator.getnumBitVectors())); + } + if (length > 0) { + // we have to set ndv + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length); + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); + ColumnStatisticsData csd = new ColumnStatisticsData(); + csd.setDecimalStats(aggregateData); + adjustedStatsMap.put(pseudoPartName.toString(), csd); + if (useDensityFunctionForNDVEstimation) { + densityAvgSum += (HBaseUtils.getDoubleValue(aggregateData.getHighValue()) - HBaseUtils + .getDoubleValue(aggregateData.getLowValue())) / aggregateData.getNumDVs(); + } + } + } + extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, + adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + } + statsObj.setStatsData(columnStatisticsData); + return statsObj; + } + + @Override + public void extrapolate(ColumnStatisticsData extrapolateData, int numParts, + int numPartsWithStats, Map adjustedIndexMap, + Map adjustedStatsMap, double densityAvg) { + int rightBorderInd = numParts; + DecimalColumnStatsData extrapolateDecimalData = new DecimalColumnStatsData(); + Map extractedAdjustedStatsMap = new HashMap<>(); + for (Map.Entry entry : adjustedStatsMap.entrySet()) { + extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDecimalStats()); + } + List> list = new LinkedList>( + extractedAdjustedStatsMap.entrySet()); + // get the lowValue + Collections.sort(list, new Comparator>() { + public int compare(Map.Entry o1, + Map.Entry o2) { + return o1.getValue().getLowValue().compareTo(o2.getValue().getLowValue()); + } + }); + double minInd = adjustedIndexMap.get(list.get(0).getKey()); + double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); + double lowValue = 0; + double min = HBaseUtils.getDoubleValue(list.get(0).getValue().getLowValue()); + double max = HBaseUtils.getDoubleValue(list.get(list.size() - 1).getValue().getLowValue()); + if (minInd == maxInd) { + lowValue = min; + } else if (minInd < maxInd) { + // left border is the min + lowValue = (max - (max - min) * maxInd / (maxInd - minInd)); + } else { + // right border is the min + lowValue = (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd)); + } + + // get the highValue + Collections.sort(list, new Comparator>() { + public int compare(Map.Entry o1, + Map.Entry o2) { + return o1.getValue().getHighValue().compareTo(o2.getValue().getHighValue()); + } + }); + minInd = adjustedIndexMap.get(list.get(0).getKey()); + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); + double highValue = 0; + min = HBaseUtils.getDoubleValue(list.get(0).getValue().getHighValue()); + max = HBaseUtils.getDoubleValue(list.get(list.size() - 1).getValue().getHighValue()); + if (minInd == maxInd) { + highValue = min; + } else if (minInd < maxInd) { + // right border is the max + highValue = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd)); + } else { + // left border is the max + highValue = (min + (max - min) * minInd / (minInd - maxInd)); + } + + // get the #nulls + long numNulls = 0; + for (Map.Entry entry : extractedAdjustedStatsMap.entrySet()) { + numNulls += entry.getValue().getNumNulls(); + } + // we scale up sumNulls based on the number of partitions + numNulls = numNulls * numParts / numPartsWithStats; + + // get the ndv + long ndv = 0; + long ndvMin = 0; + long ndvMax = 0; + Collections.sort(list, new Comparator>() { + public int compare(Map.Entry o1, + Map.Entry o2) { + return o1.getValue().getNumDVs() < o2.getValue().getNumDVs() ? -1 : 1; + } + }); + long lowerBound = list.get(list.size() - 1).getValue().getNumDVs(); + long higherBound = 0; + for (Map.Entry entry : list) { + higherBound += entry.getValue().getNumDVs(); + } + if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) { + ndv = (long) ((highValue - lowValue) / densityAvg); + if (ndv < lowerBound) { + ndv = lowerBound; + } else if (ndv > higherBound) { + ndv = higherBound; + } } else { - ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(), - ndvEstimator.getnumBitVectors())); - aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); - aggregateData.setBitVectors(ndvEstimator.serialize().toString()); + minInd = adjustedIndexMap.get(list.get(0).getKey()); + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); + ndvMin = list.get(0).getValue().getNumDVs(); + ndvMax = list.get(list.size() - 1).getValue().getNumDVs(); + if (minInd == maxInd) { + ndv = ndvMin; + } else if (minInd < maxInd) { + // right border is the max + ndv = (long) (ndvMin + (ndvMax - ndvMin) * (rightBorderInd - minInd) / (maxInd - minInd)); + } else { + // left border is the max + ndv = (long) (ndvMin + (ndvMax - ndvMin) * minInd / (minInd - maxInd)); + } } + extrapolateDecimalData.setLowValue(StatObjectConverter.createThriftDecimal(String + .valueOf(lowValue))); + extrapolateDecimalData.setHighValue(StatObjectConverter.createThriftDecimal(String + .valueOf(highValue))); + extrapolateDecimalData.setNumNulls(numNulls); + extrapolateDecimalData.setNumDVs(ndv); + extrapolateData.setDecimalStats(extrapolateDecimalData); } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java index d945ec2..a88ef84 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/DoubleColumnStatsAggregator.java @@ -19,26 +19,307 @@ package org.apache.hadoop.hive.metastore.hbase.stats; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; +import org.apache.hadoop.hive.metastore.api.MetaException; -public class DoubleColumnStatsAggregator extends ColumnStatsAggregator { +public class DoubleColumnStatsAggregator extends ColumnStatsAggregator implements + IExtrapolatePartStatus { @Override - public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) { - DoubleColumnStatsData aggregateData = aggregateColStats.getStatsData().getDoubleStats(); - DoubleColumnStatsData newData = newColStats.getStatsData().getDoubleStats(); - aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue())); - aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue())); - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); - if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) { - aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); + public ColumnStatisticsObj aggregate(String colName, List partNames, + List css) throws MetaException { + ColumnStatisticsObj statsObj = null; + + // check if all the ColumnStatisticsObjs contain stats and all the ndv are + // bitvectors + boolean doAllPartitionContainStats = partNames.size() == css.size(); + boolean isNDVBitVectorSet = true; + String colType = null; + for (ColumnStatistics cs : css) { + if (cs.getStatsObjSize() != 1) { + throw new MetaException( + "The number of columns should be exactly one in aggrStats, but found " + + cs.getStatsObjSize()); + } + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + if (statsObj == null) { + colType = cso.getColType(); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso + .getStatsData().getSetField()); + } + if (numBitVectors <= 0 || !cso.getStatsData().getDoubleStats().isSetBitVectors() + || cso.getStatsData().getDoubleStats().getBitVectors().length() == 0) { + isNDVBitVectorSet = false; + break; + } + } + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); + if (doAllPartitionContainStats || css.size() < 2) { + DoubleColumnStatsData aggregateData = null; + long lowerBound = 0; + long higherBound = 0; + double densityAvgSum = 0.0; + NumDistinctValueEstimator ndvEstimator = null; + if (isNDVBitVectorSet) { + ndvEstimator = new NumDistinctValueEstimator(numBitVectors); + } + for (ColumnStatistics cs : css) { + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats(); + if (useDensityFunctionForNDVEstimation) { + lowerBound = Math.max(lowerBound, newData.getNumDVs()); + higherBound += newData.getNumDVs(); + densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs(); + } + if (isNDVBitVectorSet) { + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(), + ndvEstimator.getnumBitVectors())); + } + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue())); + aggregateData + .setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue())); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); + } + } + if (isNDVBitVectorSet) { + // if all the ColumnStatisticsObjs contain bitvectors, we do not need to + // use uniform distribution assumption because we can merge bitvectors + // to get a good estimation. + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); + } else { + if (useDensityFunctionForNDVEstimation) { + // We have estimation, lowerbound and higherbound. We use estimation + // if it is between lowerbound and higherbound. + double densityAvg = densityAvgSum / partNames.size(); + long estimation = (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg); + if (estimation < lowerBound) { + aggregateData.setNumDVs(lowerBound); + } else if (estimation > higherBound) { + aggregateData.setNumDVs(higherBound); + } else { + aggregateData.setNumDVs(estimation); + } + } else { + // Without useDensityFunctionForNDVEstimation, we just use the + // default one, which is the max of all the partitions and it is + // already done. + } + } + columnStatisticsData.setDoubleStats(aggregateData); + } else { + // we need extrapolation + Map indexMap = new HashMap(); + for (int index = 0; index < partNames.size(); index++) { + indexMap.put(partNames.get(index), index); + } + Map adjustedIndexMap = new HashMap(); + Map adjustedStatsMap = new HashMap(); + // while we scan the css, we also get the densityAvg, lowerbound and + // higerbound when useDensityFunctionForNDVEstimation is true. + double densityAvgSum = 0.0; + if (!isNDVBitVectorSet) { + // if not every partition uses bitvector for ndv, we just fall back to + // the traditional extrapolation methods. + for (ColumnStatistics cs : css) { + String partName = cs.getStatsDesc().getPartName(); + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats(); + if (useDensityFunctionForNDVEstimation) { + densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs(); + } + adjustedIndexMap.put(partName, (double) indexMap.get(partName)); + adjustedStatsMap.put(partName, cso.getStatsData()); + } + } else { + // we first merge all the adjacent bitvectors that we could merge and + // derive new partition names and index. + NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors); + StringBuilder pseudoPartName = new StringBuilder(); + double pseudoIndexSum = 0; + int length = 0; + int curIndex = -1; + DoubleColumnStatsData aggregateData = null; + for (ColumnStatistics cs : css) { + String partName = cs.getStatsDesc().getPartName(); + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats(); + // newData.isSetBitVectors() should be true for sure because we + // already checked it before. + if (indexMap.get(partName) != curIndex) { + // There is bitvector, but it is not adjacent to the previous ones. + if (length > 0) { + // we have to set ndv + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length); + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); + ColumnStatisticsData csd = new ColumnStatisticsData(); + csd.setDoubleStats(aggregateData); + adjustedStatsMap.put(pseudoPartName.toString(), csd); + if (useDensityFunctionForNDVEstimation) { + densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs(); + } + // reset everything + pseudoPartName = new StringBuilder(); + pseudoIndexSum = 0; + length = 0; + } + aggregateData = null; + } + curIndex = indexMap.get(partName); + pseudoPartName.append(partName); + pseudoIndexSum += curIndex; + length++; + curIndex++; + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue())); + aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), + newData.getHighValue())); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + } + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(), + ndvEstimator.getnumBitVectors())); + } + if (length > 0) { + // we have to set ndv + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length); + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); + ColumnStatisticsData csd = new ColumnStatisticsData(); + csd.setDoubleStats(aggregateData); + adjustedStatsMap.put(pseudoPartName.toString(), csd); + if (useDensityFunctionForNDVEstimation) { + densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs(); + } + } + } + extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, + adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + } + statsObj.setStatsData(columnStatisticsData); + return statsObj; + } + + @Override + public void extrapolate(ColumnStatisticsData extrapolateData, int numParts, + int numPartsWithStats, Map adjustedIndexMap, + Map adjustedStatsMap, double densityAvg) { + int rightBorderInd = numParts; + DoubleColumnStatsData extrapolateDoubleData = new DoubleColumnStatsData(); + Map extractedAdjustedStatsMap = new HashMap<>(); + for (Map.Entry entry : adjustedStatsMap.entrySet()) { + extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDoubleStats()); + } + List> list = new LinkedList>( + extractedAdjustedStatsMap.entrySet()); + // get the lowValue + Collections.sort(list, new Comparator>() { + public int compare(Map.Entry o1, + Map.Entry o2) { + return o1.getValue().getLowValue() < o2.getValue().getLowValue() ? -1 : 1; + } + }); + double minInd = adjustedIndexMap.get(list.get(0).getKey()); + double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); + double lowValue = 0; + double min = list.get(0).getValue().getLowValue(); + double max = list.get(list.size() - 1).getValue().getLowValue(); + if (minInd == maxInd) { + lowValue = min; + } else if (minInd < maxInd) { + // left border is the min + lowValue = (max - (max - min) * maxInd / (maxInd - minInd)); + } else { + // right border is the min + lowValue = (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd)); + } + + // get the highValue + Collections.sort(list, new Comparator>() { + public int compare(Map.Entry o1, + Map.Entry o2) { + return o1.getValue().getHighValue() < o2.getValue().getHighValue() ? -1 : 1; + } + }); + minInd = adjustedIndexMap.get(list.get(0).getKey()); + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); + double highValue = 0; + min = list.get(0).getValue().getHighValue(); + max = list.get(list.size() - 1).getValue().getHighValue(); + if (minInd == maxInd) { + highValue = min; + } else if (minInd < maxInd) { + // right border is the max + highValue = (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd)); + } else { + // left border is the max + highValue = (min + (max - min) * minInd / (minInd - maxInd)); + } + + // get the #nulls + long numNulls = 0; + for (Map.Entry entry : extractedAdjustedStatsMap.entrySet()) { + numNulls += entry.getValue().getNumNulls(); + } + // we scale up sumNulls based on the number of partitions + numNulls = numNulls * numParts / numPartsWithStats; + + // get the ndv + long ndv = 0; + long ndvMin = 0; + long ndvMax = 0; + Collections.sort(list, new Comparator>() { + public int compare(Map.Entry o1, + Map.Entry o2) { + return o1.getValue().getNumDVs() < o2.getValue().getNumDVs() ? -1 : 1; + } + }); + long lowerBound = list.get(list.size() - 1).getValue().getNumDVs(); + long higherBound = 0; + for (Map.Entry entry : list) { + higherBound += entry.getValue().getNumDVs(); + } + if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) { + ndv = (long) ((highValue - lowValue) / densityAvg); + if (ndv < lowerBound) { + ndv = lowerBound; + } else if (ndv > higherBound) { + ndv = higherBound; + } } else { - ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(), - ndvEstimator.getnumBitVectors())); - aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); - aggregateData.setBitVectors(ndvEstimator.serialize().toString()); + minInd = adjustedIndexMap.get(list.get(0).getKey()); + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); + ndvMin = list.get(0).getValue().getNumDVs(); + ndvMax = list.get(list.size() - 1).getValue().getNumDVs(); + if (minInd == maxInd) { + ndv = ndvMin; + } else if (minInd < maxInd) { + // right border is the max + ndv = (long) (ndvMin + (ndvMax - ndvMin) * (rightBorderInd - minInd) / (maxInd - minInd)); + } else { + // left border is the max + ndv = (long) (ndvMin + (ndvMax - ndvMin) * minInd / (minInd - maxInd)); + } } + extrapolateDoubleData.setLowValue(lowValue); + extrapolateDoubleData.setHighValue(highValue); + extrapolateDoubleData.setNumNulls(numNulls); + extrapolateDoubleData.setNumDVs(ndv); + extrapolateData.setDoubleStats(extrapolateDoubleData); } + } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java new file mode 100644 index 0000000..99af060 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/IExtrapolatePartStatus.java @@ -0,0 +1,30 @@ +package org.apache.hadoop.hive.metastore.hbase.stats; + +import java.util.Map; + +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; + +public interface IExtrapolatePartStatus { + // The following function will extrapolate the stats when the column stats of + // some partitions are missing. + /** + * @param extrapolateData + * it will carry back the specific stats, e.g., DOUBLE_STATS or + * LONG_STATS + * @param numParts + * the total number of partitions + * @param numPartsWithStats + * the number of partitions that have stats + * @param adjustedIndexMap + * the partition name to index map + * @param adjustedStatsMap + * the partition name to its stats map + * @param densityAvg + * the average of ndv density, which is useful when + * useDensityFunctionForNDVEstimation is true. + */ + public abstract void extrapolate(ColumnStatisticsData extrapolateData, int numParts, + int numPartsWithStats, Map adjustedIndexMap, + Map adjustedStatsMap, double densityAvg); + +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java index 068dd00..8ac6561 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/LongColumnStatsAggregator.java @@ -19,26 +19,305 @@ package org.apache.hadoop.hive.metastore.hbase.stats; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; +import org.apache.hadoop.hive.metastore.api.MetaException; -public class LongColumnStatsAggregator extends ColumnStatsAggregator { +public class LongColumnStatsAggregator extends ColumnStatsAggregator implements + IExtrapolatePartStatus { @Override - public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) { - LongColumnStatsData aggregateData = aggregateColStats.getStatsData().getLongStats(); - LongColumnStatsData newData = newColStats.getStatsData().getLongStats(); - aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue())); - aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue())); - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); - if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) { - aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); + public ColumnStatisticsObj aggregate(String colName, List partNames, + List css) throws MetaException { + ColumnStatisticsObj statsObj = null; + + // check if all the ColumnStatisticsObjs contain stats and all the ndv are + // bitvectors + boolean doAllPartitionContainStats = partNames.size() == css.size(); + boolean isNDVBitVectorSet = true; + String colType = null; + for (ColumnStatistics cs : css) { + if (cs.getStatsObjSize() != 1) { + throw new MetaException( + "The number of columns should be exactly one in aggrStats, but found " + + cs.getStatsObjSize()); + } + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + if (statsObj == null) { + colType = cso.getColType(); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso + .getStatsData().getSetField()); + } + if (numBitVectors <= 0 || !cso.getStatsData().getLongStats().isSetBitVectors() + || cso.getStatsData().getLongStats().getBitVectors().length() == 0) { + isNDVBitVectorSet = false; + break; + } + } + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); + if (doAllPartitionContainStats || css.size() < 2) { + LongColumnStatsData aggregateData = null; + long lowerBound = 0; + long higherBound = 0; + double densityAvgSum = 0.0; + NumDistinctValueEstimator ndvEstimator = null; + if (isNDVBitVectorSet) { + ndvEstimator = new NumDistinctValueEstimator(numBitVectors); + } + for (ColumnStatistics cs : css) { + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + LongColumnStatsData newData = cso.getStatsData().getLongStats(); + if (useDensityFunctionForNDVEstimation) { + lowerBound = Math.max(lowerBound, newData.getNumDVs()); + higherBound += newData.getNumDVs(); + densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs(); + } + if (isNDVBitVectorSet) { + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(), + ndvEstimator.getnumBitVectors())); + } + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue())); + aggregateData + .setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue())); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); + } + } + if (isNDVBitVectorSet) { + // if all the ColumnStatisticsObjs contain bitvectors, we do not need to + // use uniform distribution assumption because we can merge bitvectors + // to get a good estimation. + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); + } else { + if (useDensityFunctionForNDVEstimation) { + // We have estimation, lowerbound and higherbound. We use estimation + // if it is between lowerbound and higherbound. + double densityAvg = densityAvgSum / partNames.size(); + long estimation = (long) ((aggregateData.getHighValue() - aggregateData.getLowValue()) / densityAvg); + if (estimation < lowerBound) { + aggregateData.setNumDVs(lowerBound); + } else if (estimation > higherBound) { + aggregateData.setNumDVs(higherBound); + } else { + aggregateData.setNumDVs(estimation); + } + } else { + // Without useDensityFunctionForNDVEstimation, we just use the + // default one, which is the max of all the partitions and it is + // already done. + } + } + columnStatisticsData.setLongStats(aggregateData); } else { - ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(), - ndvEstimator.getnumBitVectors())); - aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); - aggregateData.setBitVectors(ndvEstimator.serialize().toString()); + // we need extrapolation + Map indexMap = new HashMap(); + for (int index = 0; index < partNames.size(); index++) { + indexMap.put(partNames.get(index), index); + } + Map adjustedIndexMap = new HashMap(); + Map adjustedStatsMap = new HashMap(); + // while we scan the css, we also get the densityAvg, lowerbound and + // higerbound when useDensityFunctionForNDVEstimation is true. + double densityAvgSum = 0.0; + if (!isNDVBitVectorSet) { + // if not every partition uses bitvector for ndv, we just fall back to + // the traditional extrapolation methods. + for (ColumnStatistics cs : css) { + String partName = cs.getStatsDesc().getPartName(); + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + LongColumnStatsData newData = cso.getStatsData().getLongStats(); + if (useDensityFunctionForNDVEstimation) { + densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs(); + } + adjustedIndexMap.put(partName, (double) indexMap.get(partName)); + adjustedStatsMap.put(partName, cso.getStatsData()); + } + } else { + // we first merge all the adjacent bitvectors that we could merge and + // derive new partition names and index. + NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors); + StringBuilder pseudoPartName = new StringBuilder(); + double pseudoIndexSum = 0; + int length = 0; + int curIndex = -1; + LongColumnStatsData aggregateData = null; + for (ColumnStatistics cs : css) { + String partName = cs.getStatsDesc().getPartName(); + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + LongColumnStatsData newData = cso.getStatsData().getLongStats(); + // newData.isSetBitVectors() should be true for sure because we + // already checked it before. + if (indexMap.get(partName) != curIndex) { + // There is bitvector, but it is not adjacent to the previous ones. + if (length > 0) { + // we have to set ndv + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length); + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); + ColumnStatisticsData csd = new ColumnStatisticsData(); + csd.setLongStats(aggregateData); + adjustedStatsMap.put(pseudoPartName.toString(), csd); + if (useDensityFunctionForNDVEstimation) { + densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs(); + } + // reset everything + pseudoPartName = new StringBuilder(); + pseudoIndexSum = 0; + length = 0; + } + aggregateData = null; + } + curIndex = indexMap.get(partName); + pseudoPartName.append(partName); + pseudoIndexSum += curIndex; + length++; + curIndex++; + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue())); + aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), + newData.getHighValue())); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + } + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(), + ndvEstimator.getnumBitVectors())); + } + if (length > 0) { + // we have to set ndv + adjustedIndexMap.put(pseudoPartName.toString(), pseudoIndexSum / length); + aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); + ColumnStatisticsData csd = new ColumnStatisticsData(); + csd.setLongStats(aggregateData); + adjustedStatsMap.put(pseudoPartName.toString(), csd); + if (useDensityFunctionForNDVEstimation) { + densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs(); + } + } + } + extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, + adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); } + statsObj.setStatsData(columnStatisticsData); + return statsObj; } + + @Override + public void extrapolate(ColumnStatisticsData extrapolateData, int numParts, + int numPartsWithStats, Map adjustedIndexMap, + Map adjustedStatsMap, double densityAvg) { + int rightBorderInd = numParts; + LongColumnStatsData extrapolateLongData = new LongColumnStatsData(); + Map extractedAdjustedStatsMap = new HashMap<>(); + for (Map.Entry entry : adjustedStatsMap.entrySet()) { + extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getLongStats()); + } + List> list = new LinkedList>( + extractedAdjustedStatsMap.entrySet()); + // get the lowValue + Collections.sort(list, new Comparator>() { + public int compare(Map.Entry o1, + Map.Entry o2) { + return o1.getValue().getLowValue() < o2.getValue().getLowValue() ? -1 : 1; + } + }); + double minInd = adjustedIndexMap.get(list.get(0).getKey()); + double maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); + long lowValue = 0; + long min = list.get(0).getValue().getLowValue(); + long max = list.get(list.size() - 1).getValue().getLowValue(); + if (minInd == maxInd) { + lowValue = min; + } else if (minInd < maxInd) { + // left border is the min + lowValue = (long) (max - (max - min) * maxInd / (maxInd - minInd)); + } else { + // right border is the min + lowValue = (long) (max - (max - min) * (rightBorderInd - maxInd) / (minInd - maxInd)); + } + + // get the highValue + Collections.sort(list, new Comparator>() { + public int compare(Map.Entry o1, + Map.Entry o2) { + return o1.getValue().getHighValue() < o2.getValue().getHighValue() ? -1 : 1; + } + }); + minInd = adjustedIndexMap.get(list.get(0).getKey()); + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); + long highValue = 0; + min = list.get(0).getValue().getHighValue(); + max = list.get(list.size() - 1).getValue().getHighValue(); + if (minInd == maxInd) { + highValue = min; + } else if (minInd < maxInd) { + // right border is the max + highValue = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd)); + } else { + // left border is the max + highValue = (long) (min + (max - min) * minInd / (minInd - maxInd)); + } + + // get the #nulls + long numNulls = 0; + for (Map.Entry entry : extractedAdjustedStatsMap.entrySet()) { + numNulls += entry.getValue().getNumNulls(); + } + // we scale up sumNulls based on the number of partitions + numNulls = numNulls * numParts / numPartsWithStats; + + // get the ndv + long ndv = 0; + Collections.sort(list, new Comparator>() { + public int compare(Map.Entry o1, + Map.Entry o2) { + return o1.getValue().getNumDVs() < o2.getValue().getNumDVs() ? -1 : 1; + } + }); + long lowerBound = list.get(list.size() - 1).getValue().getNumDVs(); + long higherBound = 0; + for (Map.Entry entry : list) { + higherBound += entry.getValue().getNumDVs(); + } + if (useDensityFunctionForNDVEstimation && densityAvg != 0.0) { + ndv = (long) ((highValue - lowValue) / densityAvg); + if (ndv < lowerBound) { + ndv = lowerBound; + } else if (ndv > higherBound) { + ndv = higherBound; + } + } else { + minInd = adjustedIndexMap.get(list.get(0).getKey()); + maxInd = adjustedIndexMap.get(list.get(list.size() - 1).getKey()); + min = list.get(0).getValue().getNumDVs(); + max = list.get(list.size() - 1).getValue().getNumDVs(); + if (minInd == maxInd) { + ndv = min; + } else if (minInd < maxInd) { + // right border is the max + ndv = (long) (min + (max - min) * (rightBorderInd - minInd) / (maxInd - minInd)); + } else { + // left border is the max + ndv = (long) (min + (max - min) * minInd / (minInd - maxInd)); + } + } + extrapolateLongData.setLowValue(lowValue); + extrapolateLongData.setHighValue(highValue); + extrapolateLongData.setNumNulls(numNulls); + extrapolateLongData.setNumDVs(ndv); + extrapolateData.setLongStats(extrapolateLongData); + } + } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java index aeb6c39..2aa4046 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/StringColumnStatsAggregator.java @@ -19,26 +19,87 @@ package org.apache.hadoop.hive.metastore.hbase.stats; +import java.util.List; + import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; public class StringColumnStatsAggregator extends ColumnStatsAggregator { @Override - public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) { - StringColumnStatsData aggregateData = aggregateColStats.getStatsData().getStringStats(); - StringColumnStatsData newData = newColStats.getStatsData().getStringStats(); - aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen())); - aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen())); - aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); - if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) { - aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); - } else { - ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(), - ndvEstimator.getnumBitVectors())); + public ColumnStatisticsObj aggregate(String colName, List partNames, + List css) throws MetaException { + ColumnStatisticsObj statsObj = null; + + // check if all the ColumnStatisticsObjs contain stats and all the ndv are + // bitvectors. Only when both of the conditions are true, we merge bit + // vectors. Otherwise, just use the maximum function. + boolean doAllPartitionContainStats = partNames.size() == css.size(); + boolean isNDVBitVectorSet = true; + String colType = null; + for (ColumnStatistics cs : css) { + if (cs.getStatsObjSize() != 1) { + throw new MetaException( + "The number of columns should be exactly one in aggrStats, but found " + + cs.getStatsObjSize()); + } + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + if (statsObj == null) { + colType = cso.getColType(); + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso + .getStatsData().getSetField()); + } + if (numBitVectors <= 0 || !cso.getStatsData().getStringStats().isSetBitVectors() + || cso.getStatsData().getStringStats().getBitVectors().length() == 0) { + isNDVBitVectorSet = false; + break; + } + } + ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); + if (doAllPartitionContainStats && isNDVBitVectorSet) { + StringColumnStatsData aggregateData = null; + NumDistinctValueEstimator ndvEstimator = new NumDistinctValueEstimator(numBitVectors); + for (ColumnStatistics cs : css) { + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + StringColumnStatsData newData = cso.getStatsData().getStringStats(); + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(), + ndvEstimator.getnumBitVectors())); + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + aggregateData + .setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen())); + aggregateData + .setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen())); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + } + } aggregateData.setNumDVs(ndvEstimator.estimateNumDistinctValues()); - aggregateData.setBitVectors(ndvEstimator.serialize().toString()); + columnStatisticsData.setStringStats(aggregateData); + } else { + StringColumnStatsData aggregateData = null; + for (ColumnStatistics cs : css) { + ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + StringColumnStatsData newData = cso.getStatsData().getStringStats(); + if (aggregateData == null) { + aggregateData = newData.deepCopy(); + } else { + aggregateData + .setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen())); + aggregateData + .setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen())); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); + } + } + columnStatisticsData.setStringStats(aggregateData); } + statsObj.setStatsData(columnStatisticsData); + return statsObj; } + } diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsExtrapolation.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsExtrapolation.java new file mode 100644 index 0000000..f4e55ed --- /dev/null +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsExtrapolation.java @@ -0,0 +1,717 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.StatObjectConverter; +import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; +import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; + +public class TestHBaseAggregateStatsExtrapolation { + private static final Logger LOG = LoggerFactory + .getLogger(TestHBaseAggregateStatsExtrapolation.class.getName()); + + @Mock + HTableInterface htable; + private HBaseStore store; + SortedMap rows = new TreeMap<>(); + + // NDV will be 3 for the bitVectors + String bitVectors = "{0, 4, 5, 7}{0, 1}{0, 1, 2}{0, 1, 4}{0}{0, 2}{0, 3}{0, 2, 3, 4}{0, 1, 4}{0, 1}{0}{0, 1, 3, 8}{0, 2}{0, 2}{0, 9}{0, 1, 4}"; + + @Before + public void before() throws IOException { + MockitoAnnotations.initMocks(this); + HiveConf conf = new HiveConf(); + conf.setBoolean(HBaseReadWrite.NO_CACHE_CONF, true); + store = MockUtils.init(conf, htable, rows); + store.backdoor().getStatsCache().resetCounters(); + } + + private static interface Checker { + void checkStats(AggrStats aggrStats) throws Exception; + } + + @Test + public void allPartitionsHaveBitVectorStatusLong() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col1", "long", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col1"); + obj.setColType("long"); + ColumnStatisticsData data = new ColumnStatisticsData(); + LongColumnStatsData dcsd = new LongColumnStatsData(); + dcsd.setHighValue(1000 + i); + dcsd.setLowValue(-1000 - i); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i + 1); + dcsd.setBitVectors(bitVectors); + data.setLongStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(10, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col1", cso.getColName()); + Assert.assertEquals("long", cso.getColType()); + LongColumnStatsData lcsd = cso.getStatsData().getLongStats(); + Assert.assertEquals(1009, lcsd.getHighValue(), 0.01); + Assert.assertEquals(-1009, lcsd.getLowValue(), 0.01); + Assert.assertEquals(45, lcsd.getNumNulls()); + Assert.assertEquals(3, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + } + + @Test + public void allPartitionsHaveBitVectorStatusDecimal() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col1_decimal", "decimal", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col1_decimal"); + obj.setColType("decimal"); + ColumnStatisticsData data = new ColumnStatisticsData(); + DecimalColumnStatsData dcsd = new DecimalColumnStatsData(); + dcsd.setHighValue(StatObjectConverter.createThriftDecimal("" + (1000 + i))); + dcsd.setLowValue(StatObjectConverter.createThriftDecimal("" + (-1000 - i))); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i + 1); + dcsd.setBitVectors(bitVectors); + data.setDecimalStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(10, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col1_decimal", cso.getColName()); + Assert.assertEquals("decimal", cso.getColType()); + DecimalColumnStatsData lcsd = cso.getStatsData().getDecimalStats(); + Assert.assertEquals(1009, HBaseUtils.getDoubleValue(lcsd.getHighValue()), 0.01); + Assert.assertEquals(-1009, HBaseUtils.getDoubleValue(lcsd.getLowValue()), 0.01); + Assert.assertEquals(45, lcsd.getNumNulls()); + Assert.assertEquals(3, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col1_decimal")); + statChecker.checkStats(aggrStats); + } + + @Test + public void allPartitionsHaveBitVectorStatusDouble() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col1_double", "double", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col1_double"); + obj.setColType("double"); + ColumnStatisticsData data = new ColumnStatisticsData(); + DoubleColumnStatsData dcsd = new DoubleColumnStatsData(); + dcsd.setHighValue(1000 + i); + dcsd.setLowValue(-1000 - i); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i + 1); + dcsd.setBitVectors(bitVectors); + data.setDoubleStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(10, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col1_double", cso.getColName()); + Assert.assertEquals("double", cso.getColType()); + DoubleColumnStatsData lcsd = cso.getStatsData().getDoubleStats(); + Assert.assertEquals(1009, lcsd.getHighValue(), 0.01); + Assert.assertEquals(-1009, lcsd.getLowValue(), 0.01); + Assert.assertEquals(45, lcsd.getNumNulls()); + Assert.assertEquals(3, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col1_double")); + statChecker.checkStats(aggrStats); + } + + @Test + public void allPartitionsHaveBitVectorStatusString() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col1_string", "string", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col1_string"); + obj.setColType("string"); + ColumnStatisticsData data = new ColumnStatisticsData(); + StringColumnStatsData dcsd = new StringColumnStatsData(); + dcsd.setAvgColLen(i + 1); + dcsd.setMaxColLen(i + 10); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i + 1); + dcsd.setBitVectors(bitVectors); + data.setStringStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(10, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col1_string", cso.getColName()); + Assert.assertEquals("string", cso.getColType()); + StringColumnStatsData lcsd = cso.getStatsData().getStringStats(); + Assert.assertEquals(10, lcsd.getAvgColLen(), 0.01); + Assert.assertEquals(19, lcsd.getMaxColLen(), 0.01); + Assert.assertEquals(45, lcsd.getNumNulls()); + Assert.assertEquals(3, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col1_string")); + statChecker.checkStats(aggrStats); + } + + @Test + public void noPartitionsHaveBitVectorStatus() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col2", "long", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col2"); + obj.setColType("long"); + ColumnStatisticsData data = new ColumnStatisticsData(); + LongColumnStatsData dcsd = new LongColumnStatsData(); + dcsd.setHighValue(1000 + i); + dcsd.setLowValue(-1000 - i); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i); + data.setLongStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(10, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col2", cso.getColName()); + Assert.assertEquals("long", cso.getColType()); + LongColumnStatsData lcsd = cso.getStatsData().getLongStats(); + Assert.assertEquals(1009, lcsd.getHighValue(), 0.01); + Assert.assertEquals(-1009, lcsd.getLowValue(), 0.01); + Assert.assertEquals(45, lcsd.getNumNulls()); + Assert.assertEquals(90, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col2")); + statChecker.checkStats(aggrStats); + } + + @Test + public void TwoEndsOfPartitionsHaveBitVectorStatus() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col3", "long", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + if (i < 2 || i > 7) { + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col3"); + obj.setColType("long"); + ColumnStatisticsData data = new ColumnStatisticsData(); + LongColumnStatsData dcsd = new LongColumnStatsData(); + dcsd.setHighValue(1000 + i); + dcsd.setLowValue(-1000 - i); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i); + dcsd.setBitVectors(bitVectors); + data.setLongStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(4, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col3", cso.getColName()); + Assert.assertEquals("long", cso.getColType()); + LongColumnStatsData lcsd = cso.getStatsData().getLongStats(); + Assert.assertEquals(1010, lcsd.getHighValue(), 0.01); + Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01); + Assert.assertEquals(45, lcsd.getNumNulls()); + Assert.assertEquals(3, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col3")); + statChecker.checkStats(aggrStats); + } + + @Test + public void MiddleOfPartitionsHaveBitVectorStatus() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col4", "long", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + if (i > 2 && i < 7) { + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col4"); + obj.setColType("long"); + ColumnStatisticsData data = new ColumnStatisticsData(); + LongColumnStatsData dcsd = new LongColumnStatsData(); + dcsd.setHighValue(1000 + i); + dcsd.setLowValue(-1000 - i); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i); + dcsd.setBitVectors(bitVectors); + data.setLongStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(4, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col4", cso.getColName()); + Assert.assertEquals("long", cso.getColType()); + LongColumnStatsData lcsd = cso.getStatsData().getLongStats(); + Assert.assertEquals(1006, lcsd.getHighValue(), 0.01); + Assert.assertEquals(-1006, lcsd.getLowValue(), 0.01); + Assert.assertEquals(45, lcsd.getNumNulls()); + Assert.assertEquals(3, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col4")); + statChecker.checkStats(aggrStats); + } + + @Test + public void TwoEndsAndMiddleOfPartitionsHaveBitVectorStatusLong() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col5", "long", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + if (i == 0 || i == 2 || i == 3 || i == 5 || i == 6 || i == 8) { + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col5"); + obj.setColType("long"); + ColumnStatisticsData data = new ColumnStatisticsData(); + LongColumnStatsData dcsd = new LongColumnStatsData(); + dcsd.setHighValue(1000 + i); + dcsd.setLowValue(-1000 - i); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i); + dcsd.setBitVectors(bitVectors); + data.setLongStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(6, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col5", cso.getColName()); + Assert.assertEquals("long", cso.getColType()); + LongColumnStatsData lcsd = cso.getStatsData().getLongStats(); + Assert.assertEquals(1010, lcsd.getHighValue(), 0.01); + Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01); + Assert.assertEquals(40, lcsd.getNumNulls()); + Assert.assertEquals(3, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col5")); + statChecker.checkStats(aggrStats); + } + + @Test + public void TwoEndsAndMiddleOfPartitionsHaveBitVectorStatusDouble() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col5_double", "double", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + if (i == 0 || i == 2 || i == 3 || i == 5 || i == 6 || i == 8) { + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col5_double"); + obj.setColType("double"); + ColumnStatisticsData data = new ColumnStatisticsData(); + DoubleColumnStatsData dcsd = new DoubleColumnStatsData(); + dcsd.setHighValue(1000 + i); + dcsd.setLowValue(-1000 - i); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i); + dcsd.setBitVectors(bitVectors); + data.setDoubleStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(6, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col5_double", cso.getColName()); + Assert.assertEquals("double", cso.getColType()); + DoubleColumnStatsData lcsd = cso.getStatsData().getDoubleStats(); + Assert.assertEquals(1010, lcsd.getHighValue(), 0.01); + Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01); + Assert.assertEquals(40, lcsd.getNumNulls()); + Assert.assertEquals(3, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col5_double")); + statChecker.checkStats(aggrStats); + } +} diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java new file mode 100644 index 0000000..62918be --- /dev/null +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsNDVUniformDist.java @@ -0,0 +1,581 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.StatObjectConverter; +import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; +import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; + +public class TestHBaseAggregateStatsNDVUniformDist { + private static final Logger LOG = LoggerFactory + .getLogger(TestHBaseAggregateStatsNDVUniformDist.class.getName()); + + @Mock + HTableInterface htable; + private HBaseStore store; + SortedMap rows = new TreeMap<>(); + + // NDV will be 3 for bitVectors[0] and 12 for bitVectors[1] + String bitVectors[] = { + "{0, 4, 5, 7}{0, 1}{0, 1, 2}{0, 1, 4}{0}{0, 2}{0, 3}{0, 2, 3, 4}{0, 1, 4}{0, 1}{0}{0, 1, 3, 8}{0, 2}{0, 2}{0, 9}{0, 1, 4}", + "{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}{1, 2}" }; + + @Before + public void before() throws IOException { + MockitoAnnotations.initMocks(this); + HiveConf conf = new HiveConf(); + conf.setBoolean(HBaseReadWrite.NO_CACHE_CONF, true); + conf.setBoolean(HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION.varname, true); + store = MockUtils.init(conf, htable, rows); + store.backdoor().getStatsCache().resetCounters(); + } + + private static interface Checker { + void checkStats(AggrStats aggrStats) throws Exception; + } + + @Test + public void allPartitionsHaveBitVectorStatus() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col1", "long", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col1"); + obj.setColType("long"); + ColumnStatisticsData data = new ColumnStatisticsData(); + LongColumnStatsData dcsd = new LongColumnStatsData(); + dcsd.setHighValue(1000 + i); + dcsd.setLowValue(-1000 - i); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i + 1); + dcsd.setBitVectors(bitVectors[0]); + data.setLongStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(10, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col1", cso.getColName()); + Assert.assertEquals("long", cso.getColType()); + LongColumnStatsData lcsd = cso.getStatsData().getLongStats(); + Assert.assertEquals(1009, lcsd.getHighValue(), 0.01); + Assert.assertEquals(-1009, lcsd.getLowValue(), 0.01); + Assert.assertEquals(45, lcsd.getNumNulls()); + Assert.assertEquals(3, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + } + + @Test + public void noPartitionsHaveBitVectorStatus() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col2", "long", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col2"); + obj.setColType("long"); + ColumnStatisticsData data = new ColumnStatisticsData(); + LongColumnStatsData dcsd = new LongColumnStatsData(); + dcsd.setHighValue(1000 + i); + dcsd.setLowValue(-1000 - i); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i + 1); + data.setLongStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(10, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col2", cso.getColName()); + Assert.assertEquals("long", cso.getColType()); + LongColumnStatsData lcsd = cso.getStatsData().getLongStats(); + Assert.assertEquals(1009, lcsd.getHighValue(), 0.01); + Assert.assertEquals(-1009, lcsd.getLowValue(), 0.01); + Assert.assertEquals(45, lcsd.getNumNulls()); + Assert.assertEquals(91, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col2")); + statChecker.checkStats(aggrStats); + } + + @Test + public void TwoEndsOfPartitionsHaveBitVectorStatus() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col3", "long", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + if (i < 2 || i > 7) { + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col3"); + obj.setColType("long"); + ColumnStatisticsData data = new ColumnStatisticsData(); + LongColumnStatsData dcsd = new LongColumnStatsData(); + dcsd.setHighValue(1000 + i); + dcsd.setLowValue(-1000 - i); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i + 1); + dcsd.setBitVectors(bitVectors[i / 5]); + data.setLongStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(4, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col3", cso.getColName()); + Assert.assertEquals("long", cso.getColType()); + LongColumnStatsData lcsd = cso.getStatsData().getLongStats(); + Assert.assertEquals(1010, lcsd.getHighValue(), 0.01); + Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01); + Assert.assertEquals(45, lcsd.getNumNulls()); + Assert.assertEquals(12, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col3")); + statChecker.checkStats(aggrStats); + } + + @Test + public void MiddleOfPartitionsHaveBitVectorStatus() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col4", "long", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + if (i > 2 && i < 7) { + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col4"); + obj.setColType("long"); + ColumnStatisticsData data = new ColumnStatisticsData(); + LongColumnStatsData dcsd = new LongColumnStatsData(); + dcsd.setHighValue(1000 + i); + dcsd.setLowValue(-1000 - i); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i + 1); + dcsd.setBitVectors(bitVectors[0]); + data.setLongStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(4, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col4", cso.getColName()); + Assert.assertEquals("long", cso.getColType()); + LongColumnStatsData lcsd = cso.getStatsData().getLongStats(); + Assert.assertEquals(1006, lcsd.getHighValue(), 0.01); + Assert.assertEquals(-1006, lcsd.getLowValue(), 0.01); + Assert.assertEquals(45, lcsd.getNumNulls()); + Assert.assertEquals(3, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col4")); + statChecker.checkStats(aggrStats); + } + + @Test + public void TwoEndsAndMiddleOfPartitionsHaveBitVectorStatusLong() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col5_long", "long", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + if (i == 0 || i == 2 || i == 3 || i == 5 || i == 6 || i == 8) { + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col5_long"); + obj.setColType("long"); + ColumnStatisticsData data = new ColumnStatisticsData(); + LongColumnStatsData dcsd = new LongColumnStatsData(); + dcsd.setHighValue(1000 + i); + dcsd.setLowValue(-1000 - i); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i + 1); + dcsd.setBitVectors(bitVectors[i / 5]); + data.setLongStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(6, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col5_long", cso.getColName()); + Assert.assertEquals("long", cso.getColType()); + LongColumnStatsData lcsd = cso.getStatsData().getLongStats(); + Assert.assertEquals(1010, lcsd.getHighValue(), 0.01); + Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01); + Assert.assertEquals(40, lcsd.getNumNulls()); + Assert.assertEquals(12, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col5_long")); + statChecker.checkStats(aggrStats); + } + + @Test + public void TwoEndsAndMiddleOfPartitionsHaveBitVectorStatusDecimal() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col5_decimal", "decimal", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + if (i == 0 || i == 2 || i == 3 || i == 5 || i == 6 || i == 8) { + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col5_decimal"); + obj.setColType("decimal"); + ColumnStatisticsData data = new ColumnStatisticsData(); + DecimalColumnStatsData dcsd = new DecimalColumnStatsData(); + dcsd.setHighValue(StatObjectConverter.createThriftDecimal("" + (1000 + i))); + dcsd.setLowValue(StatObjectConverter.createThriftDecimal("" + (-1000 - i))); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i + 1); + dcsd.setBitVectors(bitVectors[i / 5]); + data.setDecimalStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(6, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col5_decimal", cso.getColName()); + Assert.assertEquals("decimal", cso.getColType()); + DecimalColumnStatsData lcsd = cso.getStatsData().getDecimalStats(); + Assert.assertEquals(1010, HBaseUtils.getDoubleValue(lcsd.getHighValue()), 0.01); + Assert.assertEquals(-1010, HBaseUtils.getDoubleValue(lcsd.getLowValue()), 0.01); + Assert.assertEquals(40, lcsd.getNumNulls()); + Assert.assertEquals(12, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col5_decimal")); + statChecker.checkStats(aggrStats); + } + + @Test + public void TwoEndsAndMiddleOfPartitionsHaveBitVectorStatusDouble() throws Exception { + String dbName = "default"; + String tableName = "snp"; + long now = System.currentTimeMillis(); + List cols = new ArrayList<>(); + cols.add(new FieldSchema("col5_double", "double", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections. emptyMap()); + List partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections. emptyMap(), null, null, null); + store.createTable(table); + + List> partVals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + List partVal = Arrays.asList("" + i); + partVals.add(partVal); + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVal); + Partition part = new Partition(partVal, dbName, tableName, (int) now, (int) now, psd, + Collections. emptyMap()); + store.addPartition(part); + if (i == 0 || i == 2 || i == 3 || i == 5 || i == 6 || i == 8) { + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVal); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col5_double"); + obj.setColType("double"); + ColumnStatisticsData data = new ColumnStatisticsData(); + DoubleColumnStatsData dcsd = new DoubleColumnStatsData(); + dcsd.setHighValue(1000 + i); + dcsd.setLowValue(-1000 - i); + dcsd.setNumNulls(i); + dcsd.setNumDVs(10 * i + 1); + dcsd.setBitVectors(bitVectors[i / 5]); + data.setDoubleStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + store.updatePartitionColumnStatistics(cs, partVal); + } + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(6, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col5_double", cso.getColName()); + Assert.assertEquals("double", cso.getColType()); + DoubleColumnStatsData lcsd = cso.getStatsData().getDoubleStats(); + Assert.assertEquals(1010, lcsd.getHighValue(), 0.01); + Assert.assertEquals(-1010, lcsd.getLowValue(), 0.01); + Assert.assertEquals(40, lcsd.getNumNulls()); + Assert.assertEquals(12, lcsd.getNumDVs()); + } + }; + List partNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + partNames.add("ds=" + i); + } + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, partNames, + Arrays.asList("col5_double")); + statChecker.checkStats(aggrStats); + } +}