diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index bd25bc7cad..d3ede594bc 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -900,6 +900,17 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.metastore.cached.rawstore.cache.update.frequency", "60", new TimeValidator( TimeUnit.SECONDS), "The time after which metastore cache is updated from metastore DB."), + METASTORE_CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST( + "hive.metastore.cached.rawstore.cached.object.whitelist", "*", "Comma separated list of regular expressions \n " + + "to select the tables (and its partitions, stats etc) that will be cached by CachedStore. \n" + + "This can be used in conjunction with hive.metastore.cached.rawstore.cached.object.blacklist. \n" + + "Example: *, db1*, db2\\.tbl*. The last item can potentially override patterns specified before."), + METASTORE_CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST( + "hive.metastore.cached.rawstore.cached.object.blacklist", "", "Comma separated list of regular expressions \n " + + "to filter out the tables (and its partitions, stats etc) that will be cached by CachedStore. \n" + + "This can be used in conjunction with hive.metastore.cached.rawstore.cached.object.whitelist. \n" + + "Example: db2*, db3\\.tbl1, db3\\.*. The last item can potentially override patterns specified before. \n" + + "The blacklist also overrides the whitelist."), METASTORE_TXN_STORE_IMPL("hive.metastore.txn.store.impl", "org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler", "Name of class that implements org.apache.hadoop.hive.metastore.txn.TxnStore. This " + diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index c61f27b326..ff0e7d3c67 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -31,6 +32,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -124,6 +127,8 @@ private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock( true); private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false); + private static List whitelistPatterns = new ArrayList(); + private static List blacklistPatterns = new ArrayList(); private RawStore rawStore = null; private Configuration conf; private PartitionExpressionProxy expressionProxy = null; @@ -283,6 +288,8 @@ static void prewarm(RawStore rawStore) throws Exception { } } } + // Notify all blocked threads that prewarm is complete now + sharedCacheWrapper.notifyAllBlocked(); } @VisibleForTesting @@ -354,9 +361,12 @@ public void run() { if (isFirstRun) { while (isFirstRun) { try { + long startTime = System.nanoTime(); LOG.info("Prewarming CachedStore"); prewarm(rawStore); LOG.info("CachedStore initialized"); + long endTime = System.nanoTime(); + LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms"); } catch (Exception e) { LOG.error("Prewarm failure", e); sharedCacheWrapper.updateInitState(e, false); @@ -384,6 +394,9 @@ public void update() { updateTables(rawStore, dbName); List tblNames = getAllTablesInternal(dbName, sharedCacheWrapper.getUnsafe()); for (String tblName : tblNames) { + if (!shouldCacheTable(dbName, tblName)) { + continue; + } // Update the partitions for a table in cache updateTablePartitions(rawStore, dbName, tblName); // Update the table column stats for a table in cache @@ -433,6 +446,9 @@ private void updateTables(RawStore rawStore, String dbName) { try { List tblNames = rawStore.getAllTables(dbName); for (String tblName : tblNames) { + if (!shouldCacheTable(dbName, tblName)) { + continue; + } Table table = rawStore.getTable(StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName)); @@ -576,7 +592,9 @@ public void rollbackTransaction() { public void createDatabase(Database db) throws InvalidObjectException, MetaException { rawStore.createDatabase(db); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock()) { + return; + } try { // Wait if background cache update is happening databaseCacheLock.readLock().lock(); @@ -593,7 +611,7 @@ public Database getDatabase(String dbName) throws NoSuchObjectException { SharedCache sharedCache; try { sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized()) { return rawStore.getDatabase(dbName); } } catch (MetaException e) { @@ -611,7 +629,9 @@ public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaExc boolean succ = rawStore.dropDatabase(dbname); if (succ) { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock()) { + return succ; + } try { // Wait if background cache update is happening databaseCacheLock.readLock().lock(); @@ -630,7 +650,9 @@ public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectExce boolean succ = rawStore.alterDatabase(dbName, db); if (succ) { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock()) { + return succ; + } try { // Wait if background cache update is happening databaseCacheLock.readLock().lock(); @@ -646,7 +668,7 @@ public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectExce @Override public List getDatabases(String pattern) throws MetaException { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock()) { return rawStore.getDatabases(pattern); } List results = new ArrayList<>(); @@ -662,7 +684,7 @@ public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectExce @Override public List getAllDatabases() throws MetaException { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock()) { return rawStore.getAllDatabases(); } return sharedCache.listCachedDatabases(); @@ -704,34 +726,42 @@ private void validateTableType(Table tbl) { @Override public void createTable(Table tbl) throws InvalidObjectException, MetaException { rawStore.createTable(tbl); - validateTableType(tbl); + String dbName = StringUtils.normalizeIdentifier(tbl.getDbName()); + String tblName = StringUtils.normalizeIdentifier(tbl.getTableName()); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; + if ((sharedCache == null) || !shouldCacheTable(dbName, tblName) + || !sharedCacheWrapper.waitForInitAndBlock()) { + return; + } + validateTableType(tbl); try { // Wait if background cache update is happening tableCacheLock.readLock().lock(); isTableCacheDirty.set(true); - sharedCache.addTableToCache(StringUtils.normalizeIdentifier(tbl.getDbName()), - StringUtils.normalizeIdentifier(tbl.getTableName()), tbl); + sharedCache.addTableToCache(dbName, tblName, tbl); } finally { tableCacheLock.readLock().unlock(); } } @Override - public boolean dropTable(String dbName, String tableName) throws MetaException, + public boolean dropTable(String dbName, String tblName) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { - boolean succ = rawStore.dropTable(dbName, tableName); + boolean succ = rawStore.dropTable(dbName, tblName); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock() + || !isCachedTable(dbName, tblName)) { + return succ; + } // Remove table try { // Wait if background table cache update is happening tableCacheLock.readLock().lock(); isTableCacheDirty.set(true); - sharedCache.removeTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName)); + sharedCache.removeTableFromCache(dbName, tblName); } finally { tableCacheLock.readLock().unlock(); } @@ -740,8 +770,7 @@ public boolean dropTable(String dbName, String tableName) throws MetaException, // Wait if background table col stats cache update is happening tableColStatsCacheLock.readLock().lock(); isTableColStatsCacheDirty.set(true); - sharedCache.removeTableColStatsFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName)); + sharedCache.removeTableColStatsFromCache(dbName, tblName); } finally { tableColStatsCacheLock.readLock().unlock(); } @@ -750,13 +779,15 @@ public boolean dropTable(String dbName, String tableName) throws MetaException, } @Override - public Table getTable(String dbName, String tableName) throws MetaException { + public Table getTable(String dbName, String tblName) throws MetaException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTable(dbName, tableName); + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { + return rawStore.getTable(dbName, tblName); } - Table tbl = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName)); + Table tbl = sharedCache.getTableFromCache(dbName, tblName); if (tbl != null) { tbl.unsetPrivileges(); tbl.setRewriteEnabled(tbl.isRewriteEnabled()); @@ -768,14 +799,18 @@ public Table getTable(String dbName, String tableName) throws MetaException { public boolean addPartition(Partition part) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartition(part); if (succ) { + String dbName = StringUtils.normalizeIdentifier(part.getDbName()); + String tblName = StringUtils.normalizeIdentifier(part.getTableName()); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock() + || !isCachedTable(dbName, tblName)) { + return succ; + } try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - sharedCache.addPartitionToCache(StringUtils.normalizeIdentifier(part.getDbName()), - StringUtils.normalizeIdentifier(part.getTableName()), part); + sharedCache.addPartitionToCache(dbName, tblName, part); } finally { partitionCacheLock.readLock().unlock(); } @@ -788,15 +823,19 @@ public boolean addPartitions(String dbName, String tblName, List part throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(dbName, tblName, parts); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock() + || !isCachedTable(dbName, tblName)) { + return succ; + } try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); for (Partition part : parts) { - sharedCache.addPartitionToCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), part); + sharedCache.addPartitionToCache(dbName, tblName, part); } } finally { partitionCacheLock.readLock().unlock(); @@ -810,8 +849,13 @@ public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy p boolean ifNotExists) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(dbName, tblName, partitionSpec, ifNotExists); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock() + || !isCachedTable(dbName, tblName)) { + return succ; + } try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); @@ -819,8 +863,7 @@ public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy p PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); while (iterator.hasNext()) { Partition part = iterator.next(); - sharedCache.addPartitionToCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), part); + sharedCache.addPartitionToCache(dbName, tblName, part); } } finally { partitionCacheLock.readLock().unlock(); @@ -830,15 +873,17 @@ public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy p } @Override - public Partition getPartition(String dbName, String tableName, List part_vals) + public Partition getPartition(String dbName, String tblName, List part_vals) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getPartition(dbName, tableName, part_vals); + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { + return rawStore.getPartition(dbName, tblName, part_vals); } Partition part = - sharedCache.getPartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), part_vals); + sharedCache.getPartitionFromCache(dbName, tblName, part_vals); if (part != null) { part.unsetPrivileges(); } else { @@ -848,30 +893,36 @@ public Partition getPartition(String dbName, String tableName, List part } @Override - public boolean doesPartitionExist(String dbName, String tableName, + public boolean doesPartitionExist(String dbName, String tblName, List part_vals) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.doesPartitionExist(dbName, tableName, part_vals); + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { + return rawStore.doesPartitionExist(dbName, tblName, part_vals); } - return sharedCache.existPartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), part_vals); + return sharedCache.existPartitionFromCache(dbName, tblName, part_vals); } @Override - public boolean dropPartition(String dbName, String tableName, List part_vals) + public boolean dropPartition(String dbName, String tblName, List part_vals) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { - boolean succ = rawStore.dropPartition(dbName, tableName, part_vals); + boolean succ = rawStore.dropPartition(dbName, tblName, part_vals); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock() + || !isCachedTable(dbName, tblName)) { + return succ; + } // Remove partition try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - sharedCache.removePartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), part_vals); + sharedCache.removePartitionFromCache(dbName, tblName, part_vals); } finally { partitionCacheLock.readLock().unlock(); } @@ -880,8 +931,7 @@ public boolean dropPartition(String dbName, String tableName, List part_ // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - sharedCache.removePartitionColStatsFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), part_vals); + sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals); } finally { partitionColStatsCacheLock.readLock().unlock(); } @@ -890,14 +940,16 @@ public boolean dropPartition(String dbName, String tableName, List part_ } @Override - public List getPartitions(String dbName, String tableName, int max) + public List getPartitions(String dbName, String tblName, int max) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getPartitions(dbName, tableName, max); + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { + return rawStore.getPartitions(dbName, tblName, max); } - List parts = sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), max); + List parts = sharedCache.listCachedPartitions(dbName, tblName, max); if (parts != null) { for (Partition part : parts) { part.unsetPrivileges(); @@ -910,98 +962,115 @@ public boolean dropPartition(String dbName, String tableName, List part_ public void alterTable(String dbName, String tblName, Table newTable) throws InvalidObjectException, MetaException { rawStore.alterTable(dbName, tblName, newTable); - validateTableType(newTable); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + String newTblName = StringUtils.normalizeIdentifier(newTable.getTableName()); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; - // Update table cache - try { - // Wait if background cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), newTable); - } finally { - tableCacheLock.readLock().unlock(); - } - // Update partition cache (key might have changed since table name is a - // component of key) - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.alterTableInPartitionCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), newTable); - } finally { - partitionCacheLock.readLock().unlock(); + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock() + || !isCachedTable(dbName, tblName)) { + return; + } else { + if (shouldCacheTable(dbName, newTblName)) { + validateTableType(newTable); + // Update table cache + try { + // Wait if background cache update is happening + tableCacheLock.readLock().lock(); + isTableCacheDirty.set(true); + sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), newTable); + } finally { + tableCacheLock.readLock().unlock(); + } + // Update partition cache (key might have changed since table name is a + // component of key) + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + sharedCache.alterTableInPartitionCache(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), newTable); + } finally { + partitionCacheLock.readLock().unlock(); + } + } else { + // Remove the table and its cached partitions, stats etc, + // since it does not pass the whitelist/blacklist filter. + // Remove table + try { + // Wait if background cache update is happening + tableCacheLock.readLock().lock(); + isTableCacheDirty.set(true); + sharedCache.removeTableFromCache(dbName, tblName); + } finally { + tableCacheLock.readLock().unlock(); + } + // Remove partitions + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + sharedCache.removePartitionsFromCache(dbName, tblName); + } finally { + partitionCacheLock.readLock().unlock(); + } + // Remove partition col stats + try { + // Wait if background cache update is happening + partitionColStatsCacheLock.readLock().lock(); + isPartitionColStatsCacheDirty.set(true); + sharedCache.removePartitionColStatsFromCache(dbName, tblName); + } finally { + partitionColStatsCacheLock.readLock().unlock(); + } + } } } @Override - public List getTables(String dbName, String pattern) - throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTables(dbName, pattern); - } - List tableNames = new ArrayList<>(); - for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), pattern)) { - tableNames.add(table.getTableName()); - } - } - return tableNames; + public List getTables(String dbName, String pattern) throws MetaException { + // We can't return table names from cache since it may not have all tables cached + return rawStore.getTables(dbName, pattern); } @Override - public List getTables(String dbName, String pattern, - TableType tableType) throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTables(dbName, pattern); - } - List tableNames = new ArrayList<>(); - for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), pattern) && - table.getTableType().equals(tableType.toString())) { - tableNames.add(table.getTableName()); - } - } - return tableNames; + public List getTables(String dbName, String pattern, TableType tableType) + throws MetaException { + // We can't return table names from cache since it may not have all tables cached + return rawStore.getTables(dbName, pattern); } @Override - public List getTableMeta(String dbNames, String tableNames, - List tableTypes) throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTableMeta(dbNames, tableNames, tableTypes); - } - return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(dbNames), - StringUtils.normalizeIdentifier(tableNames), tableTypes); + public List getTableMeta(String dbNames, String tableNames, List tableTypes) + throws MetaException { + // We can't return table names from cache since it may not have all tables cached + return rawStore.getTableMeta(dbNames, tableNames, tableTypes); } @Override - public List getTableObjectsByName(String dbName, - List tblNames) throws MetaException, UnknownDBException { + public List
getTableObjectsByName(String dbName, List tblNames) + throws MetaException, UnknownDBException { + dbName = StringUtils.normalizeIdentifier(dbName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized()) { return rawStore.getTableObjectsByName(dbName, tblNames); } List
tables = new ArrayList<>(); for (String tblName : tblNames) { - tables.add(sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName))); + tblName = StringUtils.normalizeIdentifier(tblName); + Table tbl = sharedCache.getTableFromCache(dbName, tblName); + if (tbl == null) { + tbl = rawStore.getTable(dbName, tblName); + } + tables.add(tbl); } return tables; } @Override public List getAllTables(String dbName) throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getAllTables(dbName); - } - return getAllTablesInternal(dbName, sharedCache); + // We can't return table names from cache since it may not have all tables cached + return rawStore.getAllTables(dbName); } private static List getAllTablesInternal(String dbName, SharedCache sharedCache) { @@ -1013,37 +1082,26 @@ public void alterTable(String dbName, String tblName, Table newTable) } @Override - public List listTableNamesByFilter(String dbName, String filter, - short max_tables) throws MetaException, UnknownDBException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.listTableNamesByFilter(dbName, filter, max_tables); - } - List tableNames = new ArrayList<>(); - int count = 0; - for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), filter) - && (max_tables == -1 || count < max_tables)) { - tableNames.add(table.getTableName()); - count++; - } - } - return tableNames; + public List listTableNamesByFilter(String dbName, String filter, short max_tables) + throws MetaException, UnknownDBException { + // We can't return from cache since it may not have all tables cached + return rawStore.listTableNamesByFilter(dbName, filter, max_tables); } @Override public List listPartitionNames(String dbName, String tblName, short max_parts) throws MetaException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { return rawStore.listPartitionNames(dbName, tblName, max_parts); } List partitionNames = new ArrayList<>(); - Table t = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); + Table t = sharedCache.getTableFromCache(dbName, tblName); int count = 0; - for (Partition part : sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), max_parts)) { + for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, max_parts)) { if (max_parts == -1 || count < max_parts) { partitionNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues())); } @@ -1052,32 +1110,36 @@ public void alterTable(String dbName, String tblName, Table newTable) } @Override - public PartitionValuesResponse listPartitionValues(String db_name, String tbl_name, List cols, - boolean applyDistinct, String filter, boolean ascending, - List order, long maxParts) throws MetaException { + public PartitionValuesResponse listPartitionValues(String db_name, String tbl_name, + List cols, boolean applyDistinct, String filter, boolean ascending, + List order, long maxParts) throws MetaException { throw new UnsupportedOperationException(); } @Override - public List listPartitionNamesByFilter(String db_name, - String tbl_name, String filter, short max_parts) throws MetaException { + public List listPartitionNamesByFilter(String dbName, + String tblName, String filter, short max_parts) throws MetaException { // TODO Translate filter -> expr - return null; + return rawStore.listPartitionNamesByFilter(dbName, tblName, filter, max_parts); } @Override public void alterPartition(String dbName, String tblName, List partVals, Partition newPart) throws InvalidObjectException, MetaException { rawStore.alterPartition(dbName, tblName, partVals, newPart); - // Update partition cache + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock() + || !isCachedTable(dbName, tblName)) { + return; + } + // Update partition cache try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - sharedCache.alterPartitionInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partVals, newPart); + sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); } finally { partitionCacheLock.readLock().unlock(); } @@ -1086,8 +1148,7 @@ public void alterPartition(String dbName, String tblName, List partVals, // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - sharedCache.alterPartitionInColStatsCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partVals, newPart); + sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart); } finally { partitionColStatsCacheLock.readLock().unlock(); } @@ -1097,9 +1158,14 @@ public void alterPartition(String dbName, String tblName, List partVals, public void alterPartitions(String dbName, String tblName, List> partValsList, List newParts) throws InvalidObjectException, MetaException { rawStore.alterPartitions(dbName, tblName, partValsList, newParts); - // Update partition cache + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock() + || !isCachedTable(dbName, tblName)) { + return; + } + // Update partition cache try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); @@ -1107,8 +1173,7 @@ public void alterPartitions(String dbName, String tblName, List> pa for (int i = 0; i < partValsList.size(); i++) { List partVals = partValsList.get(i); Partition newPart = newParts.get(i); - sharedCache.alterPartitionInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partVals, newPart); + sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); } } finally { partitionCacheLock.readLock().unlock(); @@ -1121,8 +1186,7 @@ public void alterPartitions(String dbName, String tblName, List> pa for (int i = 0; i < partValsList.size(); i++) { List partVals = partValsList.get(i); Partition newPart = newParts.get(i); - sharedCache.alterPartitionInColStatsCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partVals, newPart); + sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart); } } finally { partitionColStatsCacheLock.readLock().unlock(); @@ -1185,27 +1249,26 @@ private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr, public List getPartitionsByFilter(String dbName, String tblName, String filter, short maxParts) throws MetaException, NoSuchObjectException { - // TODO Auto-generated method stub - return null; + return rawStore.getPartitionsByFilter(dbName, tblName, filter, maxParts); } @Override public boolean getPartitionsByExpr(String dbName, String tblName, byte[] expr, - String defaultPartitionName, short maxParts, List result) - throws TException { + String defaultPartitionName, short maxParts, List result) throws TException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getPartitionsByExpr( - dbName, tblName, expr, defaultPartitionName, maxParts, result); + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { + return rawStore.getPartitionsByExpr(dbName, tblName, expr, defaultPartitionName, maxParts, + result); } List partNames = new LinkedList<>(); - Table table = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); - boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn( - table, expr, defaultPartitionName, maxParts, partNames, sharedCache); + Table table = sharedCache.getTableFromCache(dbName, tblName); + boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn(table, expr, + defaultPartitionName, maxParts, partNames, sharedCache); for (String partName : partNames) { - Partition part = sharedCache.getPartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partNameToVals(partName)); + Partition part = sharedCache.getPartitionFromCache(dbName, tblName, partNameToVals(partName)); part.unsetPrivileges(); result.add(part); } @@ -1213,31 +1276,26 @@ public boolean getPartitionsByExpr(String dbName, String tblName, byte[] expr, } @Override - public int getNumPartitionsByFilter(String dbName, String tblName, - String filter) throws MetaException, NoSuchObjectException { - // TODO filter -> expr - // SharedCache sharedCache = sharedCacheWrapper.get(); - // if (sharedCache == null) { - return rawStore.getNumPartitionsByFilter(dbName, tblName, filter); - // } - // Table table = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - // StringUtils.normalizeIdentifier(tblName)); - // return 0; + public int getNumPartitionsByFilter(String dbName, String tblName, String filter) + throws MetaException, NoSuchObjectException { + return rawStore.getNumPartitionsByFilter(dbName, tblName, filter); } @Override public int getNumPartitionsByExpr(String dbName, String tblName, byte[] expr) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { return rawStore.getNumPartitionsByExpr(dbName, tblName, expr); } String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); List partNames = new LinkedList<>(); - Table table = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); - getPartitionNamesPrunedByExprNoTxn( - table, expr, defaultPartName, Short.MAX_VALUE, partNames, sharedCache); + Table table = sharedCache.getTableFromCache(dbName, tblName); + getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames, + sharedCache); return partNames.size(); } @@ -1254,14 +1312,16 @@ public int getNumPartitionsByExpr(String dbName, String tblName, byte[] expr) @Override public List getPartitionsByNames(String dbName, String tblName, List partNames) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { return rawStore.getPartitionsByNames(dbName, tblName, partNames); } List partitions = new ArrayList<>(); for (String partName : partNames) { - Partition part = sharedCache.getPartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partNameToVals(partName)); + Partition part = sharedCache.getPartitionFromCache(dbName, tblName, partNameToVals(partName)); if (part!=null) { partitions.add(part); } @@ -1429,15 +1489,16 @@ public Role getRole(String roleName) throws NoSuchObjectException { public Partition getPartitionWithAuth(String dbName, String tblName, List partVals, String userName, List groupNames) throws MetaException, NoSuchObjectException, InvalidObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames); } - Partition p = sharedCache.getPartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partVals); + Partition p = sharedCache.getPartitionFromCache(dbName, tblName, partVals); if (p!=null) { - Table t = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); + Table t = sharedCache.getTableFromCache(dbName, tblName); String partName = Warehouse.makePartName(t.getPartitionKeys(), partVals); PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, userName, groupNames); @@ -1450,16 +1511,17 @@ public Partition getPartitionWithAuth(String dbName, String tblName, public List getPartitionsWithAuth(String dbName, String tblName, short maxParts, String userName, List groupNames) throws MetaException, NoSuchObjectException, InvalidObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames); } - Table t = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); + Table t = sharedCache.getTableFromCache(dbName, tblName); List partitions = new ArrayList<>(); int count = 0; - for (Partition part : sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), maxParts)) { + for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) { if (maxParts == -1 || count < maxParts) { String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues()); PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, @@ -1476,16 +1538,17 @@ public Partition getPartitionWithAuth(String dbName, String tblName, public List listPartitionNamesPs(String dbName, String tblName, List partVals, short maxParts) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts); } List partNames = new ArrayList<>(); int count = 0; - Table t = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); - for (Partition part : sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), maxParts)) { + Table t = sharedCache.getTableFromCache(dbName, tblName); + for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) { boolean psMatch = true; for (int i=0;i listPartitionsPsWithAuth(String dbName, - String tblName, List partVals, short maxParts, String userName, - List groupNames) + public List listPartitionsPsWithAuth(String dbName, String tblName, + List partVals, short maxParts, String userName, List groupNames) throws MetaException, InvalidObjectException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.listPartitionsPsWithAuth( - dbName, tblName, partVals, maxParts, userName, groupNames); + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { + return rawStore.listPartitionsPsWithAuth(dbName, tblName, partVals, maxParts, userName, + groupNames); } List partitions = new ArrayList<>(); - Table t = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); + Table t = sharedCache.getTableFromCache(dbName, tblName); int count = 0; - for (Partition part : sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), maxParts)) { + for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) { boolean psMatch = true; - for (int i=0;i statsObjs = colStats.getStatsObj(); - Table tbl = getTable(dbName, tableName); + Table tbl = getTable(dbName, tblName); List colNames = new ArrayList<>(); for (ColumnStatisticsObj statsObj : statsObjs) { colNames.add(statsObj.getColName()); } StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames); - // Update table try { // Wait if background cache update is happening tableCacheLock.readLock().lock(); isTableCacheDirty.set(true); - sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), tbl); + sharedCache.alterTableInCache(dbName, tblName, tbl); } finally { tableCacheLock.readLock().unlock(); } - // Update table col stats try { // Wait if background cache update is happening tableColStatsCacheLock.readLock().lock(); isTableColStatsCacheDirty.set(true); - sharedCache.updateTableColStatsInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), statsObjs); + sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs); } finally { tableColStatsCacheLock.readLock().unlock(); } @@ -1588,18 +1650,19 @@ public boolean updateTableColumnStatistics(ColumnStatistics colStats) } @Override - public ColumnStatistics getTableColumnStatistics(String dbName, String tableName, + public ColumnStatistics getTableColumnStatistics(String dbName, String tblName, List colNames) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTableColumnStatistics(dbName, tableName, colNames); + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { + return rawStore.getTableColumnStatistics(dbName, tblName, colNames); } - ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName); + ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName); List colStatObjs = new ArrayList<>(); for (String colName : colNames) { - String colStatsCacheKey = - CacheUtils.buildKey(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), colName); + String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, colName); ColumnStatisticsObj colStat = sharedCache.getCachedTableColStats(colStatsCacheKey); if (colStat != null) { colStatObjs.add(colStat); @@ -1613,18 +1676,22 @@ public ColumnStatistics getTableColumnStatistics(String dbName, String tableName } @Override - public boolean deleteTableColumnStatistics(String dbName, String tableName, String colName) + public boolean deleteTableColumnStatistics(String dbName, String tblName, String colName) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { - boolean succ = rawStore.deleteTableColumnStatistics(dbName, tableName, colName); + boolean succ = rawStore.deleteTableColumnStatistics(dbName, tblName, colName); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock() + || !isCachedTable(dbName, tblName)) { + return succ; + } try { // Wait if background cache update is happening tableColStatsCacheLock.readLock().lock(); isTableColStatsCacheDirty.set(true); - sharedCache.removeTableColStatsFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), colName); + sharedCache.removeTableColStatsFromCache(dbName, tblName, colName); } finally { tableColStatsCacheLock.readLock().unlock(); } @@ -1637,37 +1704,35 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List statsObjs = colStats.getStatsObj(); - Partition part = getPartition(dbName, tableName, partVals); + Partition part = getPartition(dbName, tblName, partVals); List colNames = new ArrayList<>(); for (ColumnStatisticsObj statsObj : statsObjs) { colNames.add(statsObj.getColName()); } StatsSetupConst.setColumnStatsState(part.getParameters(), colNames); - // Update partition try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - sharedCache.alterPartitionInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), partVals, part); + sharedCache.alterPartitionInCache(dbName, tblName, partVals, part); } finally { partitionCacheLock.readLock().unlock(); } - // Update partition column stats try { // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - sharedCache.updatePartitionColStatsInCache( - StringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()), - StringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, + sharedCache.updatePartitionColStatsInCache(dbName, tblName, partVals, colStats.getStatsObj()); } finally { partitionColStatsCacheLock.readLock().unlock(); @@ -1678,27 +1743,30 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List getPartitionColumnStatistics(String dbName, String tblName, List partNames, List colNames) throws MetaException, NoSuchObjectException { return rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames); } @Override - public boolean deletePartitionColumnStatistics(String dbName, String tableName, String partName, - List partVals, String colName) throws NoSuchObjectException, MetaException, - InvalidObjectException, InvalidInputException { + public boolean deletePartitionColumnStatistics(String dbName, String tblName, String partName, + List partVals, String colName) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = - rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName); + rawStore.deletePartitionColumnStatistics(dbName, tblName, partName, partVals, colName); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock() + || !isCachedTable(dbName, tblName)) { + return succ; + } try { // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - sharedCache.removePartitionColStatsFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), partVals, colName); + sharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName); } finally { partitionColStatsCacheLock.readLock().unlock(); } @@ -1708,15 +1776,8 @@ public boolean deletePartitionColumnStatistics(String dbName, String tableName, @Override public AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, - List colNames) throws MetaException, NoSuchObjectException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); - } - List colStats = mergeColStatsForPartitions( - StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), - partNames, colNames, sharedCache); - return new AggrStats(colStats, partNames.size()); + List colNames) throws MetaException, NoSuchObjectException { + return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); } private List mergeColStatsForPartitions(String dbName, String tblName, @@ -1821,8 +1882,13 @@ public void setMetaStoreSchemaVersion(String version, String comment) public void dropPartitions(String dbName, String tblName, List partNames) throws MetaException, NoSuchObjectException { rawStore.dropPartitions(dbName, tblName, partNames); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInitAndBlock() + || !isCachedTable(dbName, tblName)) { + return; + } // Remove partitions try { // Wait if background cache update is happening @@ -1830,8 +1896,7 @@ public void dropPartitions(String dbName, String tblName, List partNames isPartitionCacheDirty.set(true); for (String partName : partNames) { List vals = partNameToVals(partName); - sharedCache.removePartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), vals); + sharedCache.removePartitionFromCache(dbName, tblName, vals); } } finally { partitionCacheLock.readLock().unlock(); @@ -1843,8 +1908,7 @@ public void dropPartitions(String dbName, String tblName, List partNames isPartitionColStatsCacheDirty.set(true); for (String partName : partNames) { List part_vals = partNameToVals(partName); - sharedCache.removePartitionColStatsFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), part_vals); + sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals); } } finally { partitionColStatsCacheLock.readLock().unlock(); @@ -2017,29 +2081,17 @@ public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) { @Override public int getTableCount() throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTableCount(); - } - return sharedCache.getCachedTableCount(); + return rawStore.getTableCount(); } @Override public int getPartitionCount() throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getPartitionCount(); - } - return sharedCache.getCachedPartitionCount(); + return rawStore.getPartitionCount(); } @Override public int getDatabaseCount() throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getDatabaseCount(); - } - return sharedCache.getCachedDatabaseCount(); + return rawStore.getDatabaseCount(); } @Override @@ -2072,16 +2124,19 @@ public int getDatabaseCount() throws MetaException { } @Override - public List createTableWithConstraints(Table tbl, - List primaryKeys, List foreignKeys, - List uniqueConstraints, - List notNullConstraints) - throws InvalidObjectException, MetaException { + public List createTableWithConstraints(Table tbl, List primaryKeys, + List foreignKeys, List uniqueConstraints, + List notNullConstraints) throws InvalidObjectException, MetaException { // TODO constraintCache - List constraintNames = rawStore.createTableWithConstraints(tbl, primaryKeys, foreignKeys, - uniqueConstraints, notNullConstraints); + List constraintNames = rawStore.createTableWithConstraints(tbl, primaryKeys, + foreignKeys, uniqueConstraints, notNullConstraints); + String dbName = StringUtils.normalizeIdentifier(tbl.getDbName()); + String tblName = StringUtils.normalizeIdentifier(tbl.getTableName()); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return constraintNames; + if ((sharedCache == null) || !shouldCacheTable(dbName, tblName) + || !sharedCacheWrapper.waitForInitAndBlock()) { + return constraintNames; + } sharedCache.addTableToCache(StringUtils.normalizeIdentifier(tbl.getDbName()), StringUtils.normalizeIdentifier(tbl.getTableName()), tbl); return constraintNames; @@ -2152,7 +2207,7 @@ public String getMetastoreDbUuid() throws MetaException { private final SharedCache instance = new SharedCache(); private final Object initLock = new Object(); - private InitState initState = InitState.NOT_ENABLED; + private volatile InitState initState = InitState.NOT_ENABLED; // We preserve the old setConf init behavior, where a failed prewarm would fail the query // and give a chance to another query to try prewarming again. Basically, we'd increment the // count and all the queries waiting for prewarm would fail; however, we will retry the prewarm @@ -2171,7 +2226,7 @@ void updateInitState(Throwable error, boolean isFatal) { if (isSuccessful) { initState = InitState.INITIALIZED; } else if (isFatal) { - initState = InitState.FAILED_FATAL; + initState = InitState.FAILED_FATAL; lastError = error; } else { ++initFailureCount; @@ -2234,6 +2289,56 @@ private boolean waitForInit() throws MetaException { } } } + + /** + * Block while waiting for initialization to complete + * The thread which is blocked, will be woken up by initLock.notifyAll + * @return + * @throws MetaException + */ + boolean waitForInitAndBlock() throws MetaException { + synchronized (initLock) { + int localFailureCount = initFailureCount; + while (true) { + switch (initState) { + case INITIALIZED: return true; + case NOT_ENABLED: return false; + case FAILED_FATAL: { + throw new RuntimeException("CachedStore prewarm had a fatal error", lastError); + } + case INITIALIZING: { + try { + initLock.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new MetaException("Interrupted"); + } + break; + } + default: throw new AssertionError(initState); + } + // Fail if any errors occured; mimicks the old behavior where a setConf prewarm failure + // would fail the current task, but cause the next setConf to try prewarm again forever. + if (initFailureCount != localFailureCount) { + throw new RuntimeException("CachedStore prewarm failed", lastError); + } + } + } + } + + /** + * Notify all threads blocked on initialization, to continue work. (We allow read while prewarm + * is running; all write calls are blocked using waitForInitAndBlock). + */ + void notifyAllBlocked() { + synchronized (initLock) { + initLock.notifyAll(); + } + } + + boolean isInitialized() { + return initState.equals(InitState.INITIALIZED); + } } @VisibleForTesting @@ -2305,4 +2410,64 @@ public void dropWMTrigger(String resourcePlanName, String triggerName) throws NoSuchObjectException, MetaException { return rawStore.getTriggersForResourcePlan(resourcePlanName); } + + static boolean isInBlacklist(String dbName, String tblName) { + String str = dbName + "." + tblName; + for (Pattern pattern : blacklistPatterns) { + Matcher matcher = pattern.matcher(str); + if (matcher.find()) { + return false; + } + } + return true; + } + + static boolean isInWhitelist(String dbName, String tblName) { + String str = dbName + "." + tblName; + for (Pattern pattern : whitelistPatterns) { + Matcher matcher = pattern.matcher(str); + if (matcher.find()) { + return true; + } + } + return false; + } + + // For testing + static void setWhitelistPattern(List patterns) { + whitelistPatterns = patterns; + } + + // For testing + static void setBlacklistPattern(List patterns) { + blacklistPatterns = patterns; + } + + // Determines if we should cache a table (& its partitions, stats etc), + // based on whitelist/blacklist + static boolean shouldCacheTable(String dbName, String tblName) { + boolean blacklistCheck = isInBlacklist(dbName, tblName); + LOG.debug("Blacklist check: {}", blacklistCheck); + boolean whitelistCheck = isInWhitelist(dbName, tblName); + LOG.debug("Whitelist check: {}", whitelistCheck); + return blacklistCheck && whitelistCheck; + } + + // Checks if the table exists in cache. + // If it does, we have also cached its partitions, stats etc + // Faster than shouldCacheTable, so we use this in most read queries, + // to decide if we can serve metadata from cache + private boolean isCachedTable(String dbName, String tblName) throws MetaException { + SharedCache sharedCache = sharedCacheWrapper.get(); + return sharedCache.getTableFromCache(dbName, tblName).equals(null) ? false : true; + } + + static List createPatterns(String configStr) { + List patternStrs = Arrays.asList(configStr.split(",")); + List patterns = new ArrayList(); + for (String str : patternStrs) { + patterns.add(Pattern.compile(str)); + } + return patterns; + } } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index a76b8480b0..6f871b32e0 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -318,6 +318,28 @@ public synchronized Partition removePartitionFromCache(String dbName, String tbl return wrapper.getPartition(); } + /** + * Given a db + table, remove all partitions for this table from the cache + * @param dbName + * @param tblName + * @return + */ + public synchronized void removePartitionsFromCache(String dbName, String tblName) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator> iterator = partitionCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + PartitionWrapper wrapper = entry.getValue(); + if (wrapper.getSdHash() != null) { + decrSd(wrapper.getSdHash()); + } + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + // Remove cached column stats for all partitions of a table public synchronized void removePartitionColStatsFromCache(String dbName, String tblName) { String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); @@ -353,6 +375,25 @@ public synchronized void removePartitionColStatsFromCache(String dbName, String partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName)); } + /** + * Given a db + table, remove all partition col stats for this table from the cache + * + * @param dbName + * @param tblName + */ + public synchronized void removePartitionsColStatsFromCache(String dbName, String tblName) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + public synchronized List listCachedPartitions(String dbName, String tblName, int max) { List partitions = new ArrayList<>(); int count = 0;