diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index 8d861e4..44f2825 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.FileMetadataHandler; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.ObjectStore; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.TableType; @@ -966,4 +967,11 @@ public void dropConstraint(String dbName, String tableName, public String getMetastoreDbUuid() throws MetaException { throw new MetaException("getMetastoreDbUuid is not implemented"); } + + @Override + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; + } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index dc1245e..dcd8a08 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.AggregateStatsCache.AggrColStats; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; @@ -1429,8 +1430,6 @@ private long partsFoundForPartitions(final String dbName, final String tableName }); } - // Get aggregated column stats for a table per partition for all columns in the partition - // This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm) public Map> getColStatsForTablePartitions(String dbName, String tblName, boolean enableBitVector) throws MetaException { String queryText = "select \"PARTITION_NAME\", " + getStatsList(enableBitVector) + " from " @@ -1483,27 +1482,62 @@ private long partsFoundForPartitions(final String dbName, final String tableName return partColStatsMap; } + public List getColStatsForAllTablePartitions(String dbName, + boolean enableBitVector) throws MetaException { + String queryText = "select \"TABLE_NAME\", \"PARTITION_NAME\", " + + getStatsList(enableBitVector) + " from " + " " + PART_COL_STATS + + " where \"DB_NAME\" = ?"; + long start = 0; + long end = 0; + Query query = null; + boolean doTrace = LOG.isDebugEnabled(); + Object qResult = null; + start = doTrace ? System.nanoTime() : 0; + query = pm.newQuery("javax.jdo.query.SQL", queryText); + qResult = executeWithArray(query, new Object[] { dbName }, queryText); + if (qResult == null) { + query.closeAll(); + return Lists.newArrayList(); + } + end = doTrace ? System.nanoTime() : 0; + timingTrace(doTrace, queryText, start, end); + List list = ensureList(qResult); + List colStatsForDB = new ArrayList(); + for (Object[] row : list) { + String tblName = (String) row[0]; + String partName = (String) row[1]; + ColumnStatisticsObj colStatObj = prepareCSObj(row, 2); + colStatsForDB.add(new ColStatsObjWithSourceInfo(colStatObj, dbName, tblName, partName)); + Deadline.checkTimeout(); + } + query.closeAll(); + return colStatsForDB; + } + + /** Should be called with the list short enough to not trip up Oracle/etc. */ private List columnStatisticsObjForPartitionsBatch(String dbName, String tableName, List partNames, List colNames, boolean areAllPartsFound, - boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector) throws MetaException { - if(enableBitVector) { - return aggrStatsUseJava(dbName, tableName, partNames, colNames, useDensityFunctionForNDVEstimation, ndvTuner); - } - else { - return aggrStatsUseDB(dbName, tableName, partNames, colNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner); + boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector) + throws MetaException { + if (enableBitVector) { + return aggrStatsUseJava(dbName, tableName, partNames, colNames, areAllPartsFound, + useDensityFunctionForNDVEstimation, ndvTuner); + } else { + return aggrStatsUseDB(dbName, tableName, partNames, colNames, areAllPartsFound, + useDensityFunctionForNDVEstimation, ndvTuner); } } private List aggrStatsUseJava(String dbName, String tableName, - List partNames, List colNames, + List partNames, List colNames, boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { // 1. get all the stats for colNames in partNames; - List partStats = getPartitionStats(dbName, tableName, partNames, colNames, - true); + List partStats = + getPartitionStats(dbName, tableName, partNames, colNames, true); // 2. use util function to aggr stats return MetaStoreUtils.aggrPartitionStats(partStats, dbName, tableName, partNames, colNames, - useDensityFunctionForNDVEstimation, ndvTuner); + areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner); } private List aggrStatsUseDB(String dbName, diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java index bbe13fd..731847b 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java @@ -1967,74 +1967,86 @@ public static MetaException newMetaException(String errorMessage, Exception e) { return cols; } - // given a list of partStats, this function will give you an aggr stats + // Given a list of partStats, this function will give you an aggr stats public static List aggrPartitionStats(List partStats, String dbName, String tableName, List partNames, List colNames, - boolean useDensityFunctionForNDVEstimation, double ndvTuner) + boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { - // 1. group by the stats by colNames - // map the colName to List - Map> map = new HashMap<>(); + Map> colStatsMap = + new HashMap>(); + // Group by the stats by colNames for (ColumnStatistics css : partStats) { List objs = css.getStatsObj(); + String partName = css.getStatsDesc().getPartName(); for (ColumnStatisticsObj obj : objs) { - List singleObj = new ArrayList<>(); - singleObj.add(obj); - ColumnStatistics singleCS = new ColumnStatistics(css.getStatsDesc(), singleObj); - if (!map.containsKey(obj.getColName())) { - map.put(obj.getColName(), new ArrayList()); + ColumnStatsAggregator colStatsAggregator = + ColumnStatsAggregatorFactory.getColumnStatsAggregator(obj.getStatsData().getSetField(), + useDensityFunctionForNDVEstimation, ndvTuner); + if (!colStatsMap.containsKey(colStatsAggregator)) { + colStatsMap.put(colStatsAggregator, new ArrayList()); } - map.get(obj.getColName()).add(singleCS); + colStatsMap.get(colStatsAggregator).add( + new ColStatsObjWithSourceInfo(obj, dbName, tableName, partName)); } } - return aggrPartitionStats(map,dbName,tableName,partNames,colNames,useDensityFunctionForNDVEstimation, ndvTuner); + if (colStatsMap.size() < 1) { + LOG.info("No stats data found for: dbName=" + dbName + " tblName=" + tableName + + " partNames= " + partNames + " colNames=" + colNames); + return new ArrayList(); + } + return aggrPartitionStats(colStatsMap, partNames, areAllPartsFound, + useDensityFunctionForNDVEstimation, ndvTuner); } public static List aggrPartitionStats( - Map> map, String dbName, String tableName, - final List partNames, List colNames, - final boolean useDensityFunctionForNDVEstimation,final double ndvTuner) throws MetaException { - List colStats = new ArrayList<>(); - // 2. Aggregate stats for each column in a separate thread - if (map.size()< 1) { - //stats are absent in RDBMS - LOG.debug("No stats data found for: dbName=" +dbName +" tblName=" + tableName + - " partNames= " + partNames + " colNames=" + colNames ); - return colStats; - } - final ExecutorService pool = Executors.newFixedThreadPool(Math.min(map.size(), 16), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("aggr-col-stats-%d").build()); + Map> colStatsMap, + final List partNames, final boolean areAllPartsFound, + final boolean useDensityFunctionForNDVEstimation, final double ndvTuner) throws MetaException { + List aggrColStatObjs = new ArrayList(); + final ExecutorService pool = + Executors.newFixedThreadPool(Math.min(colStatsMap.size(), 16), new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("aggr-col-stats-%d").build()); final List> futures = Lists.newLinkedList(); - + LOG.info("Aggregating column stats. Threads used: " + Math.min(colStatsMap.size(), 16)); long start = System.currentTimeMillis(); - for (final Entry> entry : map.entrySet()) { + for (final Entry> entry : colStatsMap + .entrySet()) { futures.add(pool.submit(new Callable() { @Override - public ColumnStatisticsObj call() throws Exception { - List css = entry.getValue(); - ColumnStatsAggregator aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(css - .iterator().next().getStatsObj().iterator().next().getStatsData().getSetField(), - useDensityFunctionForNDVEstimation, ndvTuner); - ColumnStatisticsObj statsObj = aggregator.aggregate(entry.getKey(), partNames, css); - return statsObj; - }})); + public ColumnStatisticsObj call() throws MetaException { + List colStatWithPartInfo = entry.getValue(); + ColumnStatsAggregator aggregator = entry.getKey(); + try { + ColumnStatisticsObj statsObj = + aggregator.aggregate(colStatWithPartInfo, partNames, areAllPartsFound); + return statsObj; + } catch (MetaException e) { + e.printStackTrace(); + LOG.debug(e.toString()); + throw e; + } + } + })); } pool.shutdown(); - for (Future future : futures) { - try { - colStats.add(future.get()); - } catch (InterruptedException | ExecutionException e) { - pool.shutdownNow(); - LOG.debug(e.toString()); - throw new MetaException(e.toString()); + if (!futures.isEmpty()) { + for (Future future : futures) { + try { + if ((future != null) && (future.get() != null)) { + aggrColStatObjs.add(future.get()); + } + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + LOG.debug(e.toString()); + throw new MetaException(e.toString()); + } } } - LOG.debug("Time for aggr col stats in seconds: {} Threads used: {}", - ((System.currentTimeMillis() - (double)start))/1000, Math.min(map.size(), 16)); - return colStats; + LOG.info("Time for aggr col stats in seconds " + + ((System.currentTimeMillis() - (double) start)) / 1000); + return aggrColStatObjs; } - /** * Produce a hash for the storage descriptor * @param sd storage descriptor to hash @@ -2119,4 +2131,36 @@ public ColumnStatisticsObj call() throws Exception { public static double decimalToDouble(Decimal decimal) { return new BigDecimal(new BigInteger(decimal.getUnscaled()), decimal.getScale()).doubleValue(); } + + // ColumnStatisticsObj with info about its db, table, partition (if table is partitioned) + public static class ColStatsObjWithSourceInfo { + private ColumnStatisticsObj colStatsObj; + private String dbName; + private String tblName; + private String partName; + + public ColStatsObjWithSourceInfo(ColumnStatisticsObj colStatsObj, String dbName, + String tblName, String partName) { + this.colStatsObj = colStatsObj; + this.dbName = dbName; + this.tblName = tblName; + this.partName = partName; + } + + public ColumnStatisticsObj getColStatsObj() { + return colStatsObj; + } + + public String getDbName() { + return dbName; + } + + public String getTblName() { + return tblName; + } + + public String getPartName() { + return partName; + } + } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 3053dcb..32b48bd 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -63,6 +63,7 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.MetricRegistry; + import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configurable; @@ -76,6 +77,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreDirectSql.SqlFilterForPushdown; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; @@ -7400,6 +7402,36 @@ protected String describeResult() { } @Override + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException { + final boolean enableBitVector = HiveConf.getBoolVar(getConf(), + HiveConf.ConfVars.HIVE_STATS_FETCH_BITVECTOR); + return new GetHelper>(dbName, null, true, false) { + @Override + protected List getSqlResult( + GetHelper> ctx) throws MetaException { + return directSql.getColStatsForAllTablePartitions(dbName, enableBitVector); + } + + @Override + protected List getJdoResult( + GetHelper> ctx) throws MetaException, + NoSuchObjectException { + // This is fast path for query optimizations, if we can find this info + // quickly using directSql, do it. No point in failing back to slow path + // here. + throw new MetaException("Jdo path is not implemented for getPartitionColStatsForDatabase."); + } + + @Override + protected String describeResult() { + return null; + } + }.run(true); + } + + + @Override public void flushCache() { // NOP as there's no caching } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java index 71982a0..bf1400c 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; @@ -596,6 +597,18 @@ public AggrStats get_aggr_stats_for(String dbName, String tblName, String tableName) throws MetaException, NoSuchObjectException; /** + * Get column stats for all partitions of all tables in the database + * + * @param dbName + * @return List of column stats objects for all partitions of all tables in + * the database + * @throws NoSuchObjectException + * @throws MetaException + */ + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException; + + /** * Get the next notification event. * @param rqst Request containing information on the last processed notification. * @return list of notifications, sorted by eventId diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index 3ba81ce..c607c25 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -35,9 +35,11 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Deadline; import org.apache.hadoop.hive.metastore.FileMetadataHandler; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.ObjectStore; import org.apache.hadoop.hive.metastore.PartFilterExprUtil; import org.apache.hadoop.hive.metastore.PartitionExpressionProxy; @@ -50,6 +52,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; @@ -82,6 +85,9 @@ import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.cache.SharedCache.StatsType; +import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator; +import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregatorFactory; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hive.common.util.HiveStringUtils; import org.apache.thrift.TException; @@ -233,6 +239,14 @@ void prewarm() throws Exception { List dbNames = rawStore.getAllDatabases(); for (String dbName : dbNames) { Database db = rawStore.getDatabase(dbName); + // Cache partition column stats + Deadline.startTimer("getColStatsForDatabase"); + List colStatsForDB = + rawStore.getPartitionColStatsForDatabase(dbName); + Deadline.stopTimer(); + if (colStatsForDB != null) { + SharedCache.addPartitionColStatsToCache(colStatsForDB); + } SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(dbName), db); List tblNames = rawStore.getAllTables(dbName); for (String tblName : tblNames) { @@ -246,14 +260,6 @@ void prewarm() throws Exception { SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), partition); } - // Cache partition column stats - Deadline.startTimer("getColStatsForTablePartitions"); - Map> colStatsPerPartition = - rawStore.getColStatsForTablePartitions(dbName, tblName); - Deadline.stopTimer(); - if (colStatsPerPartition != null) { - SharedCache.addPartitionColStatsToCache(dbName, tblName, colStatsPerPartition); - } // Cache table column stats List colNames = MetaStoreUtils.getColumnNamesForTable(table); Deadline.startTimer("getTableColumnStatistics"); @@ -264,6 +270,34 @@ void prewarm() throws Exception { SharedCache.addTableColStatsToCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); } + // Cache aggregate stats for all partitions of a table and for all but + // default partition + List partNames = listPartitionNames(dbName, tblName, (short) -1); + if ((partNames != null) && (partNames.size() > 0)) { + Deadline.startTimer("getColStatsForAllPartitions"); + AggrStats aggrStatsAllPartitions = + get_aggr_stats_for(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + // Remove default partition from partition names and get aggregate + // stats again + List partKeys = table.getPartitionKeys(); + String defaultPartitionValue = HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME); + List partCols = new ArrayList(); + List partVals = new ArrayList(); + for (FieldSchema fs : partKeys) { + partCols.add(fs.getName()); + partVals.add(defaultPartitionValue); + } + String defaultPartitionName = FileUtils.makePartName(partCols, partVals); + partNames.remove(defaultPartitionName); + Deadline.startTimer("getColStatsForAllPartitionsExceptDefault"); + AggrStats aggrStatsAllButDefaultPartition = + get_aggr_stats_for(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + SharedCache.addAggregateStatsToCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions, + aggrStatsAllButDefaultPartition); + } } } } @@ -1529,14 +1563,35 @@ public boolean deletePartitionColumnStatistics(String dbName, String tableName, } @Override - public AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, - List colNames) throws MetaException, NoSuchObjectException { - List colStats = mergeColStatsForPartitions( - HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), - partNames, colNames); - return new AggrStats(colStats, partNames.size()); - - } + public AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, + List colNames) throws MetaException, NoSuchObjectException { + List colStats; + List allPartNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); + if (partNames.size() == allPartNames.size()) { + colStats = + SharedCache.getAggrStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), colNames, StatsType.ALL); + if (colStats != null) { + return new AggrStats(colStats, partNames.size()); + } + } else if (partNames.size() == (allPartNames.size() - 1)) { + String defaultPartitionName = HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME); + if (!partNames.contains(defaultPartitionName)) { + colStats = + SharedCache.getAggrStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), colNames, StatsType.ALLBUTDEFAULT); + if (colStats != null) { + return new AggrStats(colStats, partNames.size()); + } + } + } + LOG.info("Didn't find aggr stats in cache. Merging them. tblName=" + tblName + " parts=" + + partNames + " cols=" + colNames); + colStats = + mergeColStatsForPartitions(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tblName), partNames, colNames); + return new AggrStats(colStats, partNames.size()); + } private List mergeColStatsForPartitions(String dbName, String tblName, List partNames, List colNames) throws MetaException { @@ -1544,36 +1599,47 @@ public AggrStats get_aggr_stats_for(String dbName, String tblName, List HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION); final double ndvTuner = HiveConf.getFloatVar(getConf(), HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER); - Map> map = new HashMap<>(); - + Map> colStatsMap = + new HashMap>(); + boolean areAllPartsFound = true; for (String colName : colNames) { - List colStats = new ArrayList<>(); + ColumnStatsAggregator colStatsAggregator = null; + List colStatsWithPartInfoList = new ArrayList(); for (String partName : partNames) { String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName); - List colStat = new ArrayList<>(); ColumnStatisticsObj colStatsForPart = SharedCache .getCachedPartitionColStats(colStatsCacheKey); if (colStatsForPart != null) { - colStat.add(colStatsForPart); - ColumnStatisticsDesc csDesc = new ColumnStatisticsDesc(false, dbName, tblName); - csDesc.setPartName(partName); - colStats.add(new ColumnStatistics(csDesc, colStat)); + ColStatsObjWithSourceInfo colStatsWithPartInfo = new ColStatsObjWithSourceInfo( + colStatsForPart, dbName, tblName, partName); + colStatsWithPartInfoList.add(colStatsWithPartInfo); + if (colStatsAggregator == null) { + colStatsAggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator( + colStatsForPart.getStatsData().getSetField(), useDensityFunctionForNDVEstimation, + ndvTuner); + } } else { - LOG.debug("Stats not found in CachedStore for: dbName={} tblName={} partName={} colName={}", - dbName, tblName,partName, colName); + LOG.info("Stats not found for: dbName=" + dbName + " tblName=" + tblName + " partName= " + + partName + " colName=" + colName); + areAllPartsFound = false; } } - map.put(colName, colStats); + colStatsMap.put(colStatsAggregator, colStatsWithPartInfoList); + } + if (colStatsMap.size() <= 1) { + LOG.info("No stats data found for: dbName=" + dbName + " tblName=" + tblName + " partNames= " + + partNames + " colNames=" + colNames); + return new ArrayList(); } + // Note that enableBitVector does not apply here because ColumnStatisticsObj - // itself will tell whether - // bitvector is null or not and aggr logic can automatically apply. - return MetaStoreUtils.aggrPartitionStats(map, dbName, tblName, partNames, colNames, + // itself will tell whether bitvector is null or not and aggr logic can + // automatically apply. + return MetaStoreUtils.aggrPartitionStats(colStatsMap, partNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner); } - @Override public long cleanupEvents() { return rawStore.cleanupEvents(); @@ -1931,6 +1997,12 @@ public void dropConstraint(String dbName, String tableName, return rawStore.getColStatsForTablePartitions(dbName, tableName); } + @Override + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException { + return rawStore.getPartitionColStatsForDatabase(dbName); + } + public RawStore getRawStore() { return rawStore; } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index 80b17e0..51e62bb 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -28,8 +28,10 @@ import java.util.TreeMap; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.StatObjectConverter; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -58,8 +60,23 @@ new TreeMap(); private static Map sdCache = new HashMap(); + private static Map> aggrColStatsCache = new HashMap>(); private static MessageDigest md; + static enum StatsType { + ALL(0), ALLBUTDEFAULT(1); + + private final int position; + + private StatsType(int position) { + this.position = position; + } + + public int getPosition() { + return position; + } + } + static final private Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName()); static { @@ -429,6 +446,24 @@ public static synchronized ColumnStatisticsObj getCachedPartitionColStats(String return partitionColStatsCache.get(key)!=null?partitionColStatsCache.get(key).deepCopy():null; } + public static synchronized void addPartitionColStatsToCache( + List colStatsForDB) { + for (ColStatsObjWithSourceInfo colStatWithSourceInfo : colStatsForDB) { + List partVals; + try { + partVals = Warehouse.getPartValuesFromPartName(colStatWithSourceInfo.getPartName()); + ColumnStatisticsObj colStatObj = colStatWithSourceInfo.getColStatsObj(); + String key = + CacheUtils.buildKey(colStatWithSourceInfo.getDbName(), + colStatWithSourceInfo.getTblName(), partVals, colStatObj.getColName()); + partitionColStatsCache.put(key, colStatObj); + } catch (MetaException e) { + LOG.info("Unable to add partition stats for: " + colStatWithSourceInfo.getPartName() + + " to SharedCache", e); + } + } + } + public static synchronized void addPartitionColStatsToCache(String dbName, String tableName, Map> colStatsPerPartition) { for (Map.Entry> entry : colStatsPerPartition.entrySet()) { @@ -471,6 +506,48 @@ public static synchronized void refreshTableColStats(String dbName, String table addTableColStatsToCache(dbName, tableName, colStatsForTable); } + public static synchronized void addAggregateStatsToCache(String dbName, String tblName, + AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { + if (aggrStatsAllPartitions != null) { + for (ColumnStatisticsObj colStatObj : aggrStatsAllPartitions.getColStats()) { + String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName()); + List value = new ArrayList(); + value.add(StatsType.ALL.getPosition(), colStatObj); + aggrColStatsCache.put(key, value); + } + } + if (aggrStatsAllButDefaultPartition != null) { + for (ColumnStatisticsObj colStatObj : aggrStatsAllButDefaultPartition.getColStats()) { + String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName()); + List value = aggrColStatsCache.get(key); + if ((value != null) && (value.size() > 0)) { + value.add(StatsType.ALLBUTDEFAULT.getPosition(), colStatObj); + } + } + } + } + + public static synchronized List getAggrStatsFromCache(String dbName, + String tblName, List colNames, StatsType statsType) { + List colStats = new ArrayList(); + for (String colName : colNames) { + String key = CacheUtils.buildKey(dbName, tblName, colName); + List colStatList = aggrColStatsCache.get(key); + // If unable to find stats for a column, return null so we can build stats + if (colStatList == null) { + return null; + } + ColumnStatisticsObj colStatObj = colStatList.get(statsType.getPosition()); + // If unable to find stats for this StatsType, return null so we can build + // stats + if (colStatObj == null) { + return null; + } + colStats.add(colStatObj); + } + return colStats; + } + public static void increSd(StorageDescriptor sd, byte[] sdHash) { ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); if (sdCache.containsKey(byteArray)) { diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java index e6c836b..5873e5e 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BinaryColumnStatsAggregator.java @@ -21,8 +21,8 @@ import java.util.List; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -30,19 +30,16 @@ public class BinaryColumnStatsAggregator extends ColumnStatsAggregator { @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithPartInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - BinaryColumnStatsData aggregateData = null; String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + String colName = null; + BinaryColumnStatsData aggregateData = null; + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso .getStatsData().getSetField()); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java index a34bc9f..e198b0c 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/BooleanColumnStatsAggregator.java @@ -21,8 +21,8 @@ import java.util.List; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -30,19 +30,17 @@ public class BooleanColumnStatsAggregator extends ColumnStatsAggregator { @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithPartInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - BooleanColumnStatsData aggregateData = null; String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + String colName = null; + BooleanColumnStatsData aggregateData = null; + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); + colType = cso.getColType(); colType = cso.getColType(); statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso .getStatsData().getSetField()); @@ -61,5 +59,4 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, statsObj.setStatsData(columnStatisticsData); return statsObj; } - -} +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java index a52e5e5..ea4325c 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregator.java @@ -21,13 +21,14 @@ import java.util.List; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.MetaException; public abstract class ColumnStatsAggregator { public boolean useDensityFunctionForNDVEstimation; public double ndvTuner; - public abstract ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException; + + public abstract ColumnStatisticsObj aggregate(List colStatsWithPartInfo, + List partNames, boolean areAllPartsFound) throws MetaException; } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java index dfae708..c93302e 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/ColumnStatsAggregatorFactory.java @@ -24,19 +24,25 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector; -import org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector; -import org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector; -import org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector; -import org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector; +import org.apache.hadoop.hive.metastore.api.DateColumnStatsData; +import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; +import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; +import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; +import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ColumnStatsAggregatorFactory { + private static final Logger LOG = LoggerFactory.getLogger(ColumnStatsAggregatorFactory.class.getName()); private ColumnStatsAggregatorFactory() { } public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type, boolean useDensityFunctionForNDVEstimation, double ndvTuner) { + + LOG.debug("Returning ColumnStatsAggregator for type: " + type.getFieldName()); + ColumnStatsAggregator agg; switch (type) { case BOOLEAN_STATS: @@ -79,19 +85,19 @@ public static ColumnStatisticsObj newColumnStaticsObj(String colName, String col break; case LONG_STATS: - csd.setLongStats(new LongColumnStatsDataInspector()); + csd.setLongStats(new LongColumnStatsData()); break; case DATE_STATS: - csd.setDateStats(new DateColumnStatsDataInspector()); + csd.setDateStats(new DateColumnStatsData()); break; case DOUBLE_STATS: - csd.setDoubleStats(new DoubleColumnStatsDataInspector()); + csd.setDoubleStats(new DoubleColumnStatsData()); break; case STRING_STATS: - csd.setStringStats(new StringColumnStatsDataInspector()); + csd.setStringStats(new StringColumnStatsData()); break; case BINARY_STATS: @@ -99,7 +105,7 @@ public static ColumnStatisticsObj newColumnStaticsObj(String colName, String col break; case DECIMAL_STATS: - csd.setDecimalStats(new DecimalColumnStatsDataInspector()); + csd.setDecimalStats(new DecimalColumnStatsData()); break; default: diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java index ee95396..b45baf4 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DateColumnStatsAggregator.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Date; @@ -44,30 +44,23 @@ private static final Logger LOG = LoggerFactory.getLogger(DateColumnStatsAggregator.class); @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithPartInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - - // check if all the ColumnStatisticsObjs contain stats and all the ndv are - // bitvectors - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); - NumDistinctValueEstimator ndvEstimator = null; String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + String colName = null; + // check if all the ColumnStatisticsObjs contain stats and all the ndv are bitvectors + NumDistinctValueEstimator ndvEstimator = null; + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso .getStatsData().getSetField()); } - DateColumnStatsDataInspector dateColumnStats = - (DateColumnStatsDataInspector) cso.getStatsData().getDateStats(); + DateColumnStatsDataInspector dateColumnStats = (DateColumnStatsDataInspector) cso + .getStatsData().getDateStats(); if (dateColumnStats.getNdvEstimator() == null) { ndvEstimator = null; break; @@ -86,21 +79,22 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } } } + LOG.debug("doAllPartitionContainStats for " + colName + " is " + areAllPartsFound); if (ndvEstimator != null) { ndvEstimator = NumDistinctValueEstimatorFactory .getEmptyNumDistinctValueEstimator(ndvEstimator); } LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { + if (areAllPartsFound || colStatsWithPartInfo.size() < 2) { DateColumnStatsDataInspector aggregateData = null; long lowerBound = 0; long higherBound = 0; double densityAvgSum = 0.0; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - DateColumnStatsDataInspector newData = - (DateColumnStatsDataInspector) cso.getStatsData().getDateStats(); + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + DateColumnStatsDataInspector newData = (DateColumnStatsDataInspector) cso.getStatsData() + .getDateStats(); lowerBound = Math.max(lowerBound, newData.getNumDVs()); higherBound += newData.getNumDVs(); densityAvgSum += (diff(newData.getHighValue(), newData.getLowValue())) @@ -112,8 +106,7 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, aggregateData = newData.deepCopy(); } else { aggregateData.setLowValue(min(aggregateData.getLowValue(), newData.getLowValue())); - aggregateData - .setHighValue(max(aggregateData.getHighValue(), newData.getHighValue())); + aggregateData.setHighValue(max(aggregateData.getHighValue(), newData.getHighValue())); aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); } @@ -157,12 +150,13 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, if (ndvEstimator == null) { // if not every partition uses bitvector for ndv, we just fall back to // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); DateColumnStatsData newData = cso.getStatsData().getDateStats(); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += diff(newData.getHighValue(), newData.getLowValue()) / newData.getNumDVs(); + densityAvgSum += diff(newData.getHighValue(), newData.getLowValue()) + / newData.getNumDVs(); } adjustedIndexMap.put(partName, (double) indexMap.get(partName)); adjustedStatsMap.put(partName, cso.getStatsData()); @@ -175,11 +169,11 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, int length = 0; int curIndex = -1; DateColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - DateColumnStatsDataInspector newData = - (DateColumnStatsDataInspector) cso.getStatsData().getDateStats(); + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); + DateColumnStatsDataInspector newData = (DateColumnStatsDataInspector) cso.getStatsData() + .getDateStats(); // newData.isSetBitVectors() should be true for sure because we // already checked it before. if (indexMap.get(partName) != curIndex) { @@ -199,7 +193,8 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, pseudoPartName = new StringBuilder(); pseudoIndexSum = 0; length = 0; - ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); + ndvEstimator = NumDistinctValueEstimatorFactory + .getEmptyNumDistinctValueEstimator(ndvEstimator); } aggregateData = null; } @@ -230,11 +225,12 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } } } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + extrapolate(columnStatisticsData, partNames.size(), colStatsWithPartInfo.size(), + adjustedIndexMap, adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); } - LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, - columnStatisticsData.getDateStats().getNumDVs(),partNames.size(), css.size()); + LOG.info("Ndv estimatation for " + colName + " is " + + columnStatisticsData.getDateStats().getNumDVs() + ". # of partitions requested: " + + partNames.size() + ". # of partitions found: " + colStatsWithPartInfo.size()); statsObj.setStatsData(columnStatisticsData); return statsObj; } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java index 284c12c..c3cd0ee 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DecimalColumnStatsAggregator.java @@ -29,13 +29,12 @@ import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.StatObjectConverter; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,36 +44,31 @@ private static final Logger LOG = LoggerFactory.getLogger(DecimalColumnStatsAggregator.class); @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithPartInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - - // check if all the ColumnStatisticsObjs contain stats and all the ndv are - // bitvectors - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); - NumDistinctValueEstimator ndvEstimator = null; String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + String colName = null; + // check if all the ColumnStatisticsObjs contain stats and all the ndv are bitvectors + NumDistinctValueEstimator ndvEstimator = null; + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); - statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso - .getStatsData().getSetField()); + statsObj = + ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso.getStatsData() + .getSetField()); } - DecimalColumnStatsDataInspector decimalColumnStatsData = - (DecimalColumnStatsDataInspector) cso.getStatsData().getDecimalStats(); - if (decimalColumnStatsData.getNdvEstimator() == null) { + if (!cso.getStatsData().getDecimalStats().isSetBitVectors() + || cso.getStatsData().getDecimalStats().getBitVectors().length == 0) { ndvEstimator = null; break; } else { // check if all of the bit vectors can merge - NumDistinctValueEstimator estimator = decimalColumnStatsData.getNdvEstimator(); + NumDistinctValueEstimator estimator = + NumDistinctValueEstimatorFactory.getNumDistinctValueEstimator(cso.getStatsData() + .getDecimalStats().getBitVectors()); if (ndvEstimator == null) { ndvEstimator = estimator; } else { @@ -87,27 +81,29 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } } } + LOG.debug("Do we have stats for all partitions for " + colName + " : " + areAllPartsFound); if (ndvEstimator != null) { - ndvEstimator = NumDistinctValueEstimatorFactory - .getEmptyNumDistinctValueEstimator(ndvEstimator); + ndvEstimator = + NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); } LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { - DecimalColumnStatsDataInspector aggregateData = null; + if (areAllPartsFound || colStatsWithPartInfo.size() < 2) { + DecimalColumnStatsData aggregateData = null; long lowerBound = 0; long higherBound = 0; double densityAvgSum = 0.0; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - DecimalColumnStatsDataInspector newData = - (DecimalColumnStatsDataInspector) cso.getStatsData().getDecimalStats(); + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats(); lowerBound = Math.max(lowerBound, newData.getNumDVs()); higherBound += newData.getNumDVs(); - densityAvgSum += (MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils - .decimalToDouble(newData.getLowValue())) / newData.getNumDVs(); + densityAvgSum += + (MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils + .decimalToDouble(newData.getLowValue())) / newData.getNumDVs(); if (ndvEstimator != null) { - ndvEstimator.mergeEstimators(newData.getNdvEstimator()); + ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory + .getNumDistinctValueEstimator(newData.getBitVectors())); } if (aggregateData == null) { aggregateData = newData.deepCopy(); @@ -139,8 +135,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, // We have estimation, lowerbound and higherbound. We use estimation // if it is between lowerbound and higherbound. double densityAvg = densityAvgSum / partNames.size(); - estimation = (long) ((MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils - .decimalToDouble(aggregateData.getLowValue())) / densityAvg); + estimation = + (long) ((MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils + .decimalToDouble(aggregateData.getLowValue())) / densityAvg); if (estimation < lowerBound) { estimation = lowerBound; } else if (estimation > higherBound) { @@ -160,20 +157,22 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, indexMap.put(partNames.get(index), index); } Map adjustedIndexMap = new HashMap(); - Map adjustedStatsMap = new HashMap(); + Map adjustedStatsMap = + new HashMap(); // while we scan the css, we also get the densityAvg, lowerbound and // higerbound when useDensityFunctionForNDVEstimation is true. double densityAvgSum = 0.0; if (ndvEstimator == null) { // if not every partition uses bitvector for ndv, we just fall back to // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats(); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils - .decimalToDouble(newData.getLowValue())) / newData.getNumDVs(); + densityAvgSum += + (MetaStoreUtils.decimalToDouble(newData.getHighValue()) - MetaStoreUtils + .decimalToDouble(newData.getLowValue())) / newData.getNumDVs(); } adjustedIndexMap.put(partName, (double) indexMap.get(partName)); adjustedStatsMap.put(partName, cso.getStatsData()); @@ -185,12 +184,11 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, double pseudoIndexSum = 0; int length = 0; int curIndex = -1; - DecimalColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - DecimalColumnStatsDataInspector newData = - (DecimalColumnStatsDataInspector) cso.getStatsData().getDecimalStats(); + DecimalColumnStatsData aggregateData = null; + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); + DecimalColumnStatsData newData = cso.getStatsData().getDecimalStats(); // newData.isSetBitVectors() should be true for sure because we // already checked it before. if (indexMap.get(partName) != curIndex) { @@ -203,14 +201,16 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, csd.setDecimalStats(aggregateData); adjustedStatsMap.put(pseudoPartName.toString(), csd); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils - .decimalToDouble(aggregateData.getLowValue())) / aggregateData.getNumDVs(); + densityAvgSum += + (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils + .decimalToDouble(aggregateData.getLowValue())) / aggregateData.getNumDVs(); } // reset everything pseudoPartName = new StringBuilder(); pseudoIndexSum = 0; length = 0; - ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); + ndvEstimator = + NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); } aggregateData = null; } @@ -236,7 +236,8 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); } - ndvEstimator.mergeEstimators(newData.getNdvEstimator()); + ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory + .getNumDistinctValueEstimator(newData.getBitVectors())); } if (length > 0) { // we have to set ndv @@ -246,16 +247,18 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, csd.setDecimalStats(aggregateData); adjustedStatsMap.put(pseudoPartName.toString(), csd); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils - .decimalToDouble(aggregateData.getLowValue())) / aggregateData.getNumDVs(); + densityAvgSum += + (MetaStoreUtils.decimalToDouble(aggregateData.getHighValue()) - MetaStoreUtils + .decimalToDouble(aggregateData.getLowValue())) / aggregateData.getNumDVs(); } } } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + extrapolate(columnStatisticsData, partNames.size(), colStatsWithPartInfo.size(), + adjustedIndexMap, adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); } - LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, - columnStatisticsData.getDecimalStats().getNumDVs(),partNames.size(), css.size()); + LOG.info("Ndv estimatation for " + colName + " is " + + columnStatisticsData.getDecimalStats().getNumDVs() + ". # of partitions requested: " + + partNames.size() + ". # of partitions found: " + colStatsWithPartInfo.size()); statsObj.setStatsData(columnStatisticsData); return statsObj; } @@ -265,13 +268,14 @@ public void extrapolate(ColumnStatisticsData extrapolateData, int numParts, int numPartsWithStats, Map adjustedIndexMap, Map adjustedStatsMap, double densityAvg) { int rightBorderInd = numParts; - DecimalColumnStatsDataInspector extrapolateDecimalData = new DecimalColumnStatsDataInspector(); + DecimalColumnStatsData extrapolateDecimalData = new DecimalColumnStatsData(); Map extractedAdjustedStatsMap = new HashMap<>(); for (Map.Entry entry : adjustedStatsMap.entrySet()) { extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDecimalStats()); } - List> list = new LinkedList>( - extractedAdjustedStatsMap.entrySet()); + List> list = + new LinkedList>( + extractedAdjustedStatsMap.entrySet()); // get the lowValue Collections.sort(list, new Comparator>() { @Override diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java index bb4a725..7f8db70 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/DoubleColumnStatsAggregator.java @@ -28,12 +28,11 @@ import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,36 +42,29 @@ private static final Logger LOG = LoggerFactory.getLogger(LongColumnStatsAggregator.class); @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithPartInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - - // check if all the ColumnStatisticsObjs contain stats and all the ndv are - // bitvectors - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); - NumDistinctValueEstimator ndvEstimator = null; String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + String colName = null; + // check if all the ColumnStatisticsObjs contain stats and all the ndv are bitvectors + NumDistinctValueEstimator ndvEstimator = null; + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso .getStatsData().getSetField()); } - DoubleColumnStatsDataInspector doubleColumnStatsData = - (DoubleColumnStatsDataInspector) cso.getStatsData().getDoubleStats(); - if (doubleColumnStatsData.getNdvEstimator() == null) { + if (!cso.getStatsData().getDoubleStats().isSetBitVectors() + || cso.getStatsData().getDoubleStats().getBitVectors().length == 0) { ndvEstimator = null; break; } else { // check if all of the bit vectors can merge - NumDistinctValueEstimator estimator = doubleColumnStatsData.getNdvEstimator(); + NumDistinctValueEstimator estimator = NumDistinctValueEstimatorFactory + .getNumDistinctValueEstimator(cso.getStatsData().getDoubleStats().getBitVectors()); if (ndvEstimator == null) { ndvEstimator = estimator; } else { @@ -85,26 +77,27 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } } } + LOG.debug("Do we have stats for all partitions for " + colName + " : " + areAllPartsFound); if (ndvEstimator != null) { ndvEstimator = NumDistinctValueEstimatorFactory .getEmptyNumDistinctValueEstimator(ndvEstimator); } LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { - DoubleColumnStatsDataInspector aggregateData = null; + if (areAllPartsFound || colStatsWithPartInfo.size() < 2) { + DoubleColumnStatsData aggregateData = null; long lowerBound = 0; long higherBound = 0; double densityAvgSum = 0.0; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - DoubleColumnStatsDataInspector newData = - (DoubleColumnStatsDataInspector) cso.getStatsData().getDoubleStats(); + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats(); lowerBound = Math.max(lowerBound, newData.getNumDVs()); higherBound += newData.getNumDVs(); densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs(); if (ndvEstimator != null) { - ndvEstimator.mergeEstimators(newData.getNdvEstimator()); + ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory + .getNumDistinctValueEstimator(newData.getBitVectors())); } if (aggregateData == null) { aggregateData = newData.deepCopy(); @@ -154,9 +147,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, if (ndvEstimator == null) { // if not every partition uses bitvector for ndv, we just fall back to // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats(); if (useDensityFunctionForNDVEstimation) { densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs(); @@ -172,11 +165,10 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, int length = 0; int curIndex = -1; DoubleColumnStatsData aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - DoubleColumnStatsDataInspector newData = - (DoubleColumnStatsDataInspector) cso.getStatsData().getDoubleStats(); + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); + DoubleColumnStatsData newData = cso.getStatsData().getDoubleStats(); // newData.isSetBitVectors() should be true for sure because we // already checked it before. if (indexMap.get(partName) != curIndex) { @@ -189,13 +181,15 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, csd.setDoubleStats(aggregateData); adjustedStatsMap.put(pseudoPartName.toString(), csd); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs(); + densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) + / aggregateData.getNumDVs(); } // reset everything pseudoPartName = new StringBuilder(); pseudoIndexSum = 0; length = 0; - ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); + ndvEstimator = NumDistinctValueEstimatorFactory + .getEmptyNumDistinctValueEstimator(ndvEstimator); } aggregateData = null; } @@ -208,11 +202,12 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, aggregateData = newData.deepCopy(); } else { aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue())); - aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), - newData.getHighValue())); + aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData + .getHighValue())); aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); } - ndvEstimator.mergeEstimators(newData.getNdvEstimator()); + ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory + .getNumDistinctValueEstimator(newData.getBitVectors())); } if (length > 0) { // we have to set ndv @@ -222,15 +217,17 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, csd.setDoubleStats(aggregateData); adjustedStatsMap.put(pseudoPartName.toString(), csd); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs(); + densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) + / aggregateData.getNumDVs(); } } } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + extrapolate(columnStatisticsData, partNames.size(), colStatsWithPartInfo.size(), + adjustedIndexMap, adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); } - LOG.debug("Ndv estimatation for {} is {}. # of partitions requested: {}. # of partitions found: {}", colName, - columnStatisticsData.getDoubleStats().getNumDVs(),partNames.size(), css.size()); + LOG.info("Ndv estimatation for " + colName + " is " + + columnStatisticsData.getDoubleStats().getNumDVs() + ". # of partitions requested: " + + partNames.size() + ". # of partitions found: " + colStatsWithPartInfo.size()); statsObj.setStatsData(columnStatisticsData); return statsObj; } @@ -240,7 +237,7 @@ public void extrapolate(ColumnStatisticsData extrapolateData, int numParts, int numPartsWithStats, Map adjustedIndexMap, Map adjustedStatsMap, double densityAvg) { int rightBorderInd = numParts; - DoubleColumnStatsDataInspector extrapolateDoubleData = new DoubleColumnStatsDataInspector(); + DoubleColumnStatsData extrapolateDoubleData = new DoubleColumnStatsData(); Map extractedAdjustedStatsMap = new HashMap<>(); for (Map.Entry entry : adjustedStatsMap.entrySet()) { extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getDoubleStats()); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java index 5b1145e..1ae3eb0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/LongColumnStatsAggregator.java @@ -28,51 +28,42 @@ import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class LongColumnStatsAggregator extends ColumnStatsAggregator implements IExtrapolatePartStatus { - private static final Logger LOG = LoggerFactory.getLogger(LongColumnStatsAggregator.class); @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithPartInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - - // check if all the ColumnStatisticsObjs contain stats and all the ndv are - // bitvectors - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); - NumDistinctValueEstimator ndvEstimator = null; String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + String colName = null; + // check if all the ColumnStatisticsObjs contain stats and all the ndv are bitvectors + NumDistinctValueEstimator ndvEstimator = null; + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso .getStatsData().getSetField()); } - LongColumnStatsDataInspector longColumnStatsData = - (LongColumnStatsDataInspector) cso.getStatsData().getLongStats(); - if (longColumnStatsData.getNdvEstimator() == null) { + if (!cso.getStatsData().getLongStats().isSetBitVectors() + || cso.getStatsData().getLongStats().getBitVectors().length == 0) { ndvEstimator = null; break; } else { // check if all of the bit vectors can merge - NumDistinctValueEstimator estimator = longColumnStatsData.getNdvEstimator(); + NumDistinctValueEstimator estimator = NumDistinctValueEstimatorFactory + .getNumDistinctValueEstimator(cso.getStatsData().getLongStats().getBitVectors()); if (ndvEstimator == null) { ndvEstimator = estimator; } else { @@ -85,26 +76,27 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } } } + LOG.debug("Do we have stats for all partitions for " + colName + " : " + areAllPartsFound); if (ndvEstimator != null) { ndvEstimator = NumDistinctValueEstimatorFactory .getEmptyNumDistinctValueEstimator(ndvEstimator); } LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { - LongColumnStatsDataInspector aggregateData = null; + if (areAllPartsFound || colStatsWithPartInfo.size() < 2) { + LongColumnStatsData aggregateData = null; long lowerBound = 0; long higherBound = 0; double densityAvgSum = 0.0; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - LongColumnStatsDataInspector newData = - (LongColumnStatsDataInspector) cso.getStatsData().getLongStats(); + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + LongColumnStatsData newData = cso.getStatsData().getLongStats(); lowerBound = Math.max(lowerBound, newData.getNumDVs()); higherBound += newData.getNumDVs(); densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs(); if (ndvEstimator != null) { - ndvEstimator.mergeEstimators(newData.getNdvEstimator()); + ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory + .getNumDistinctValueEstimator(newData.getBitVectors())); } if (aggregateData == null) { aggregateData = newData.deepCopy(); @@ -155,9 +147,9 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, if (ndvEstimator == null) { // if not every partition uses bitvector for ndv, we just fall back to // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); LongColumnStatsData newData = cso.getStatsData().getLongStats(); if (useDensityFunctionForNDVEstimation) { densityAvgSum += (newData.getHighValue() - newData.getLowValue()) / newData.getNumDVs(); @@ -172,12 +164,11 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, double pseudoIndexSum = 0; int length = 0; int curIndex = -1; - LongColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - LongColumnStatsDataInspector newData = - (LongColumnStatsDataInspector) cso.getStatsData().getLongStats(); + LongColumnStatsData aggregateData = null; + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); + LongColumnStatsData newData = cso.getStatsData().getLongStats(); // newData.isSetBitVectors() should be true for sure because we // already checked it before. if (indexMap.get(partName) != curIndex) { @@ -190,13 +181,15 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, csd.setLongStats(aggregateData); adjustedStatsMap.put(pseudoPartName.toString(), csd); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs(); + densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) + / aggregateData.getNumDVs(); } // reset everything pseudoPartName = new StringBuilder(); pseudoIndexSum = 0; length = 0; - ndvEstimator = NumDistinctValueEstimatorFactory.getEmptyNumDistinctValueEstimator(ndvEstimator); + ndvEstimator = NumDistinctValueEstimatorFactory + .getEmptyNumDistinctValueEstimator(ndvEstimator); } aggregateData = null; } @@ -209,11 +202,12 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, aggregateData = newData.deepCopy(); } else { aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue())); - aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), - newData.getHighValue())); + aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData + .getHighValue())); aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); } - ndvEstimator.mergeEstimators(newData.getNdvEstimator()); + ndvEstimator.mergeEstimators(NumDistinctValueEstimatorFactory + .getNumDistinctValueEstimator(newData.getBitVectors())); } if (length > 0) { // we have to set ndv @@ -223,15 +217,18 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, csd.setLongStats(aggregateData); adjustedStatsMap.put(pseudoPartName.toString(), csd); if (useDensityFunctionForNDVEstimation) { - densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) / aggregateData.getNumDVs(); + densityAvgSum += (aggregateData.getHighValue() - aggregateData.getLowValue()) + / aggregateData.getNumDVs(); } } } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); + extrapolate(columnStatisticsData, partNames.size(), colStatsWithPartInfo.size(), + adjustedIndexMap, adjustedStatsMap, densityAvgSum / adjustedStatsMap.size()); } - LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, - columnStatisticsData.getLongStats().getNumDVs(),partNames.size(), css.size()); + statsObj.setStatsData(columnStatisticsData); + LOG.info("Ndv estimatation for " + colName + " is " + + columnStatisticsData.getLongStats().getNumDVs() + ". # of partitions requested: " + + partNames.size() + ". # of partitions found: " + colStatsWithPartInfo.size()); statsObj.setStatsData(columnStatisticsData); return statsObj; } @@ -241,7 +238,7 @@ public void extrapolate(ColumnStatisticsData extrapolateData, int numParts, int numPartsWithStats, Map adjustedIndexMap, Map adjustedStatsMap, double densityAvg) { int rightBorderInd = numParts; - LongColumnStatsDataInspector extrapolateLongData = new LongColumnStatsDataInspector(); + LongColumnStatsData extrapolateLongData = new LongColumnStatsData(); Map extractedAdjustedStatsMap = new HashMap<>(); for (Map.Entry entry : adjustedStatsMap.entrySet()) { extractedAdjustedStatsMap.put(entry.getKey(), entry.getValue().getLongStats()); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java index 1b29f92..7822d8b 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimatorFactory; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -43,31 +43,22 @@ private static final Logger LOG = LoggerFactory.getLogger(LongColumnStatsAggregator.class); @Override - public ColumnStatisticsObj aggregate(String colName, List partNames, - List css) throws MetaException { + public ColumnStatisticsObj aggregate(List colStatsWithPartInfo, + List partNames, boolean areAllPartsFound) throws MetaException { ColumnStatisticsObj statsObj = null; - - // check if all the ColumnStatisticsObjs contain stats and all the ndv are - // bitvectors. Only when both of the conditions are true, we merge bit - // vectors. Otherwise, just use the maximum function. - boolean doAllPartitionContainStats = partNames.size() == css.size(); - LOG.debug("doAllPartitionContainStats for " + colName + " is " + doAllPartitionContainStats); NumDistinctValueEstimator ndvEstimator = null; String colType = null; - for (ColumnStatistics cs : css) { - if (cs.getStatsObjSize() != 1) { - throw new MetaException( - "The number of columns should be exactly one in aggrStats, but found " - + cs.getStatsObjSize()); - } - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + String colName = null; + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); if (statsObj == null) { + colName = cso.getColName(); colType = cso.getColType(); statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(colName, colType, cso .getStatsData().getSetField()); } - StringColumnStatsDataInspector stringColumnStatsData = - (StringColumnStatsDataInspector) cso.getStatsData().getStringStats(); + StringColumnStatsDataInspector stringColumnStatsData = (StringColumnStatsDataInspector) cso + .getStatsData().getStringStats(); if (stringColumnStatsData.getNdvEstimator() == null) { ndvEstimator = null; break; @@ -86,18 +77,19 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, } } } + LOG.debug("Do we have stats for all partitions for " + colName + " : " + areAllPartsFound); if (ndvEstimator != null) { ndvEstimator = NumDistinctValueEstimatorFactory .getEmptyNumDistinctValueEstimator(ndvEstimator); } LOG.debug("all of the bit vectors can merge for " + colName + " is " + (ndvEstimator != null)); ColumnStatisticsData columnStatisticsData = new ColumnStatisticsData(); - if (doAllPartitionContainStats || css.size() < 2) { + if (areAllPartsFound || colStatsWithPartInfo.size() < 2) { StringColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - StringColumnStatsDataInspector newData = - (StringColumnStatsDataInspector) cso.getStatsData().getStringStats(); + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + StringColumnStatsDataInspector newData = (StringColumnStatsDataInspector) cso + .getStatsData().getStringStats(); if (ndvEstimator != null) { ndvEstimator.mergeEstimators(newData.getNdvEstimator()); } @@ -134,9 +126,10 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, if (ndvEstimator == null) { // if not every partition uses bitvector for ndv, we just fall back to // the traditional extrapolation methods. - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); + StringColumnStatsData newData = cso.getStatsData().getStringStats(); adjustedIndexMap.put(partName, (double) indexMap.get(partName)); adjustedStatsMap.put(partName, cso.getStatsData()); } @@ -148,11 +141,11 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, int length = 0; int curIndex = -1; StringColumnStatsDataInspector aggregateData = null; - for (ColumnStatistics cs : css) { - String partName = cs.getStatsDesc().getPartName(); - ColumnStatisticsObj cso = cs.getStatsObjIterator().next(); - StringColumnStatsDataInspector newData = - (StringColumnStatsDataInspector) cso.getStatsData().getStringStats(); + for (ColStatsObjWithSourceInfo csp : colStatsWithPartInfo) { + ColumnStatisticsObj cso = csp.getColStatsObj(); + String partName = csp.getPartName(); + StringColumnStatsDataInspector newData = (StringColumnStatsDataInspector) cso + .getStatsData().getStringStats(); // newData.isSetBitVectors() should be true for sure because we // already checked it before. if (indexMap.get(partName) != curIndex) { @@ -181,10 +174,10 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, if (aggregateData == null) { aggregateData = newData.deepCopy(); } else { - aggregateData.setAvgColLen(Math.min(aggregateData.getAvgColLen(), - newData.getAvgColLen())); - aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), - newData.getMaxColLen())); + aggregateData.setAvgColLen(Math.min(aggregateData.getAvgColLen(), newData + .getAvgColLen())); + aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData + .getMaxColLen())); aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); } ndvEstimator.mergeEstimators(newData.getNdvEstimator()); @@ -198,11 +191,12 @@ public ColumnStatisticsObj aggregate(String colName, List partNames, adjustedStatsMap.put(pseudoPartName.toString(), csd); } } - extrapolate(columnStatisticsData, partNames.size(), css.size(), adjustedIndexMap, - adjustedStatsMap, -1); + extrapolate(columnStatisticsData, partNames.size(), colStatsWithPartInfo.size(), + adjustedIndexMap, adjustedStatsMap, -1); } - LOG.debug("Ndv estimatation for {} is {} # of partitions requested: {} # of partitions found: {}", colName, - columnStatisticsData.getStringStats().getNumDVs(),partNames.size(), css.size()); + LOG.info("Ndv estimatation for " + colName + " is " + + columnStatisticsData.getStringStats().getNumDVs() + ". # of partitions requested: " + + partNames.size() + ". # of partitions found: " + colStatsWithPartInfo.size()); statsObj.setStatsData(columnStatisticsData); return statsObj; } @@ -279,7 +273,7 @@ public int compare(Map.Entry o1, @Override public int compare(Map.Entry o1, Map.Entry o2) { - return Long.compare(o1.getValue().getNumDVs(), o2.getValue().getNumDVs()); + return Long.compare(o1.getValue().getNumDVs(), o2.getValue().getNumDVs()); } }); minInd = adjustedIndexMap.get(list.get(0).getKey()); diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index 4db203d..f69c2b1 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; @@ -927,4 +928,11 @@ public void dropConstraint(String dbName, String tableName, public String getMetastoreDbUuid() throws MetaException { throw new MetaException("Get metastore uuid is not implemented"); } + + @Override + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; + } } diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index fb16cfc..e3b4eba 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; @@ -943,4 +944,11 @@ public void dropConstraint(String dbName, String tableName, public String getMetastoreDbUuid() throws MetaException { throw new MetaException("Get metastore uuid is not implemented"); } + + @Override + public List getPartitionColStatsForDatabase(String dbName) + throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; + } }