diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index b96c27e..b502cc8 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -1367,7 +1367,7 @@ private long partsFoundForPartitions(final String dbName, final String tableName + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," + "sum(\"NUM_DISTINCTS\") from \"PART_COL_STATS\"" - + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? group by \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\""; + + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?"; long start = 0; long end = 0; Query query = null; diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index 39b1676..4f98718 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -41,18 +41,10 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.AggrStats; -import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; -import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.Date; -import org.apache.hadoop.hive.metastore.api.DateColumnStatsData; -import org.apache.hadoop.hive.metastore.api.Decimal; -import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; -import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; import org.apache.hadoop.hive.metastore.api.Function; @@ -61,7 +53,6 @@ import org.apache.hadoop.hive.metastore.api.InvalidInputException; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; -import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -77,13 +68,14 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableMeta; import org.apache.hadoop.hive.metastore.api.Type; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.hbase.stats.merge.ColumnStatsMerger; +import org.apache.hadoop.hive.metastore.hbase.stats.merge.ColumnStatsMergerFactory; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -104,8 +96,6 @@ // TODO initial load slow? // TODO size estimation // TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation -// TODO factor in NDV estimation (density based estimation) logic when merging NDVs from 2 colStats object -// TODO refactor to use same common code with StatObjectConverter (for merging 2 col stats objects) public class CachedStore implements RawStore, Configurable { private static ScheduledExecutorService cacheUpdateMaster = null; @@ -231,8 +221,8 @@ private void prewarm() throws Exception { SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName), HiveStringUtils.normalizeIdentifier(tblName), partition); } - Map aggrStatsPerPartition = rawStore - .getAggrColStatsForTablePartitions(dbName, tblName); + Map aggrStatsPerPartition = + rawStore.getAggrColStatsForTablePartitions(dbName, tblName); SharedCache.addPartitionColStatsToCache(aggrStatsPerPartition); } } @@ -318,7 +308,7 @@ private boolean updateTables(RawStore rawStore, List dbNames) { Table table = rawStore.getTable(dbName, tblName); tables.add(table); } - // Update the cached database objects + // Update the cached table objects SharedCache.refreshTables(dbName, tables); for (String tblName : tblNames) { // If a preemption of this thread was requested, simply return before proceeding @@ -327,8 +317,19 @@ private boolean updateTables(RawStore rawStore, List dbNames) { } List partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); + // Update the cached partition objects for this table SharedCache.refreshPartitions(dbName, tblName, partitions); } + for (String tblName : tblNames) { + // If a preemption of this thread was requested, simply return before proceeding + if (Thread.interrupted()) { + return false; + } + // Update the cached partition col stats for this table + Map aggrStatsPerPartition = + rawStore.getAggrColStatsForTablePartitions(dbName, tblName); + SharedCache.refreshPartitionColStats(aggrStatsPerPartition); + } } catch (MetaException | NoSuchObjectException e) { LOG.error("Updating CachedStore: unable to read table", e); return false; @@ -472,6 +473,7 @@ public void createTable(Table tbl) } @Override + // TODO: drop table stats public boolean dropTable(String dbName, String tableName) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { @@ -1091,6 +1093,7 @@ public Partition getPartitionWithAuth(String dbName, String tblName, } @Override + // TODO: update table col stats cache public boolean updateTableColumnStatistics(ColumnStatistics colStats) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { @@ -1103,13 +1106,14 @@ public boolean updateTableColumnStatistics(ColumnStatistics colStats) } @Override - public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, - List partVals) throws NoSuchObjectException, MetaException, - InvalidObjectException, InvalidInputException { + public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List partVals) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals); if (succ) { - SharedCache.updatePartitionColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()), - HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, colStats.getStatsObj()); + SharedCache.updatePartitionColumnStatistics( + HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()), + HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, + colStats.getStatsObj()); } return succ; } @@ -1122,21 +1126,28 @@ public ColumnStatistics getTableColumnStatistics(String dbName, } @Override - public List getPartitionColumnStatistics(String dbName, - String tblName, List partNames, List colNames) - throws MetaException, NoSuchObjectException { + // TODO: calculate from cached values. + // Need to see if it makes sense to do this as some col stats maybe out of date/missing on cache. + public List getPartitionColumnStatistics(String dbName, String tblName, + List partNames, List colNames) throws MetaException, NoSuchObjectException { return rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames); } @Override - public boolean deletePartitionColumnStatistics(String dbName, - String tableName, String partName, List partVals, String colName) - throws NoSuchObjectException, MetaException, InvalidObjectException, - InvalidInputException { - return rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName); + public boolean deletePartitionColumnStatistics(String dbName, String tableName, String partName, + List partVals, String colName) throws NoSuchObjectException, MetaException, + InvalidObjectException, InvalidInputException { + boolean succ = + rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName); + if (succ) { + SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName), partVals, colName); + } + return succ; } @Override + // TODO: implement this public boolean deleteTableColumnStatistics(String dbName, String tableName, String colName) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { @@ -1206,8 +1217,8 @@ public void setMetaStoreSchemaVersion(String version, String comment) } @Override - public void dropPartitions(String dbName, String tblName, - List partNames) throws MetaException, NoSuchObjectException { + public void dropPartitions(String dbName, String tblName, List partNames) + throws MetaException, NoSuchObjectException { rawStore.dropPartitions(dbName, tblName, partNames); interruptCacheUpdateMaster(); for (String partName : partNames) { @@ -1339,113 +1350,21 @@ private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tbl List partNames, String colName) throws MetaException { ColumnStatisticsObj colStats = null; for (String partName : partNames) { - String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName); - ColumnStatisticsObj colStatsForPart = SharedCache.getCachedPartitionColStats( - colStatsCacheKey); + String colStatsCacheKey = + CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName); + ColumnStatisticsObj colStatsForPart = + SharedCache.getCachedPartitionColStats(colStatsCacheKey); if (colStats == null) { colStats = colStatsForPart; } else { - colStats = mergeColStatsObj(colStats, colStatsForPart); + ColumnStatsMerger merger = + ColumnStatsMergerFactory.getColumnStatsMerger(colStats, colStatsForPart); + merger.merge(colStats, colStatsForPart); } } return colStats; } - private ColumnStatisticsObj mergeColStatsObj(ColumnStatisticsObj colStats1, - ColumnStatisticsObj colStats2) throws MetaException { - if ((!colStats1.getColType().equalsIgnoreCase(colStats2.getColType())) - && (!colStats1.getColName().equalsIgnoreCase(colStats2.getColName()))) { - throw new MetaException("Can't merge column stats for two partitions for different columns."); - } - ColumnStatisticsData csd = new ColumnStatisticsData(); - ColumnStatisticsObj cso = new ColumnStatisticsObj(colStats1.getColName(), - colStats1.getColType(), csd); - ColumnStatisticsData csData1 = colStats1.getStatsData(); - ColumnStatisticsData csData2 = colStats2.getStatsData(); - String colType = colStats1.getColType().toLowerCase(); - if (colType.equals("boolean")) { - BooleanColumnStatsData boolStats = new BooleanColumnStatsData(); - boolStats.setNumFalses(csData1.getBooleanStats().getNumFalses() - + csData2.getBooleanStats().getNumFalses()); - boolStats.setNumTrues(csData1.getBooleanStats().getNumTrues() - + csData2.getBooleanStats().getNumTrues()); - boolStats.setNumNulls(csData1.getBooleanStats().getNumNulls() - + csData2.getBooleanStats().getNumNulls()); - csd.setBooleanStats(boolStats); - } else if (colType.equals("string") || colType.startsWith("varchar") - || colType.startsWith("char")) { - StringColumnStatsData stringStats = new StringColumnStatsData(); - stringStats.setNumNulls(csData1.getStringStats().getNumNulls() - + csData2.getStringStats().getNumNulls()); - stringStats.setAvgColLen(Math.max(csData1.getStringStats().getAvgColLen(), csData2 - .getStringStats().getAvgColLen())); - stringStats.setMaxColLen(Math.max(csData1.getStringStats().getMaxColLen(), csData2 - .getStringStats().getMaxColLen())); - stringStats.setNumDVs(Math.max(csData1.getStringStats().getNumDVs(), csData2.getStringStats() - .getNumDVs())); - csd.setStringStats(stringStats); - } else if (colType.equals("binary")) { - BinaryColumnStatsData binaryStats = new BinaryColumnStatsData(); - binaryStats.setNumNulls(csData1.getBinaryStats().getNumNulls() - + csData2.getBinaryStats().getNumNulls()); - binaryStats.setAvgColLen(Math.max(csData1.getBinaryStats().getAvgColLen(), csData2 - .getBinaryStats().getAvgColLen())); - binaryStats.setMaxColLen(Math.max(csData1.getBinaryStats().getMaxColLen(), csData2 - .getBinaryStats().getMaxColLen())); - csd.setBinaryStats(binaryStats); - } else if (colType.equals("bigint") || colType.equals("int") || colType.equals("smallint") - || colType.equals("tinyint") || colType.equals("timestamp")) { - LongColumnStatsData longStats = new LongColumnStatsData(); - longStats.setNumNulls(csData1.getLongStats().getNumNulls() - + csData2.getLongStats().getNumNulls()); - longStats.setHighValue(Math.max(csData1.getLongStats().getHighValue(), csData2.getLongStats() - .getHighValue())); - longStats.setLowValue(Math.min(csData1.getLongStats().getLowValue(), csData2.getLongStats() - .getLowValue())); - longStats.setNumDVs(Math.max(csData1.getLongStats().getNumDVs(), csData2.getLongStats() - .getNumDVs())); - csd.setLongStats(longStats); - } else if (colType.equals("date")) { - DateColumnStatsData dateStats = new DateColumnStatsData(); - dateStats.setNumNulls(csData1.getDateStats().getNumNulls() - + csData2.getDateStats().getNumNulls()); - dateStats.setHighValue(new Date(Math.max(csData1.getDateStats().getHighValue() - .getDaysSinceEpoch(), csData2.getDateStats().getHighValue().getDaysSinceEpoch()))); - dateStats.setHighValue(new Date(Math.min(csData1.getDateStats().getLowValue() - .getDaysSinceEpoch(), csData2.getDateStats().getLowValue().getDaysSinceEpoch()))); - dateStats.setNumDVs(Math.max(csData1.getDateStats().getNumDVs(), csData2.getDateStats() - .getNumDVs())); - csd.setDateStats(dateStats); - } else if (colType.equals("double") || colType.equals("float")) { - DoubleColumnStatsData doubleStats = new DoubleColumnStatsData(); - doubleStats.setNumNulls(csData1.getDoubleStats().getNumNulls() - + csData2.getDoubleStats().getNumNulls()); - doubleStats.setHighValue(Math.max(csData1.getDoubleStats().getHighValue(), csData2 - .getDoubleStats().getHighValue())); - doubleStats.setLowValue(Math.min(csData1.getDoubleStats().getLowValue(), csData2 - .getDoubleStats().getLowValue())); - doubleStats.setNumDVs(Math.max(csData1.getDoubleStats().getNumDVs(), csData2.getDoubleStats() - .getNumDVs())); - csd.setDoubleStats(doubleStats); - } else if (colType.startsWith("decimal")) { - DecimalColumnStatsData decimalStats = new DecimalColumnStatsData(); - decimalStats.setNumNulls(csData1.getDecimalStats().getNumNulls() - + csData2.getDecimalStats().getNumNulls()); - Decimal high = (csData1.getDecimalStats().getHighValue() - .compareTo(csData2.getDecimalStats().getHighValue()) > 0) ? csData1.getDecimalStats() - .getHighValue() : csData2.getDecimalStats().getHighValue(); - decimalStats.setHighValue(high); - Decimal low = (csData1.getDecimalStats().getLowValue() - .compareTo(csData2.getDecimalStats().getLowValue()) < 0) ? csData1.getDecimalStats() - .getLowValue() : csData2.getDecimalStats().getLowValue(); - decimalStats.setLowValue(low); - decimalStats.setNumDVs(Math.max(csData1.getDecimalStats().getNumDVs(), csData2 - .getDecimalStats().getNumDVs())); - csd.setDecimalStats(decimalStats); - } - return cso; - } - @Override public NotificationEventResponse getNextNotification( NotificationEventRequest rqst) { @@ -1562,9 +1481,8 @@ public void addForeignKeys(List fks) } @Override - public Map getAggrColStatsForTablePartitions( - String dbName, String tableName) - throws MetaException, NoSuchObjectException { + public Map getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { return rawStore.getAggrColStatsForTablePartitions(dbName, tableName); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index 7beee42..781b22e 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -21,8 +21,10 @@ import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.TreeMap; import org.apache.hadoop.hive.common.StatsSetupConst; @@ -44,9 +46,14 @@ public class SharedCache { private static Map databaseCache = new TreeMap(); private static Map tableCache = new TreeMap(); - private static Map partitionCache = new TreeMap(); - private static Map partitionColStatsCache = new TreeMap(); - private static Map sdCache = new HashMap(); + private static Map partitionCache = + new TreeMap(); + private static Map partitionColStatsCache = + new TreeMap(); + private static Map tableColStatsCache = + new TreeMap(); + private static Map sdCache = + new HashMap(); private static MessageDigest md; static { @@ -119,6 +126,7 @@ public static synchronized void removeTableFromCache(String dbName, String tblNa if (sdHash!=null) { decrSd(sdHash); } + // TODO: Remove table column stats from the cache } public static synchronized void alterTableInCache(String dbName, String tblName, Table newTable) { @@ -214,14 +222,37 @@ public static synchronized boolean existPartitionFromCache(String dbName, String return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals)); } - public static synchronized Partition removePartitionFromCache(String dbName, String tblName, List part_vals) { - PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals)); - if (wrapper.getSdHash()!=null) { + public static synchronized Partition removePartitionFromCache(String dbName, String tblName, + List part_vals) { + PartitionWrapper wrapper = + partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals)); + if (wrapper.getSdHash() != null) { decrSd(wrapper.getSdHash()); } + // Remove cached column stats if they exist for this partition + removePartitionColStatsFromCache(dbName, tblName, part_vals); return wrapper.getPartition(); } + public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName, + List partVals) { + String partialKey = CacheUtils.buildKey(dbName, tblName, partVals); + Iterator> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + + public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName, + List partVals, String colName) { + partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName)); + } + public static synchronized List listCachedPartitions(String dbName, String tblName, int max) { List partitions = new ArrayList(); int count = 0; @@ -245,13 +276,21 @@ public static synchronized void alterPartitionInCache(String dbName, String tblN public static synchronized void updatePartitionColumnStatistics(String dbName, String tableName, List partVals, List statsObjs) { Partition part = getPartitionFromCache(dbName, tableName, partVals); - part.getSd().getParameters(); List colNames = new ArrayList<>(); for (ColumnStatisticsObj statsObj:statsObjs) { colNames.add(statsObj.getColName()); } StatsSetupConst.setColumnStatsState(part.getParameters(), colNames); alterPartitionInCache(dbName, tableName, partVals, part); + updatePartitionColStatsInCache(dbName, tableName, partVals, statsObjs); + } + + public static synchronized void updatePartitionColStatsInCache(String dbName, String tableName, + List partVals, List statsObjs) { + for (ColumnStatisticsObj statsObj : statsObjs) { + partitionColStatsCache.put( + CacheUtils.buildKey(dbName, tableName, partVals, statsObj.getColName()), statsObj); + } } public static synchronized int getCachedPartitionCount() { @@ -262,10 +301,18 @@ public static synchronized ColumnStatisticsObj getCachedPartitionColStats(String return partitionColStatsCache.get(key); } - public static synchronized void addPartitionColStatsToCache(Map aggrStatsPerPartition) { + public static synchronized void addPartitionColStatsToCache( + Map aggrStatsPerPartition) { partitionColStatsCache.putAll(aggrStatsPerPartition); } + public static synchronized void refreshPartitionColStats( + Map newAggrStatsPerPartition) { + for (Map.Entry entry : newAggrStatsPerPartition.entrySet()) { + partitionColStatsCache.remove(entry.getKey()); + } + addPartitionColStatsToCache(newAggrStatsPerPartition); + } public static void increSd(StorageDescriptor sd, byte[] sdHash) { ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);