commit fc4ec1f7d00ba7591e36955135bd67d056b0532e Author: Daniel Dai Date: Mon May 6 22:41:52 2019 -0700 HIVE-21697: Remove periodical full refresh in HMS cache diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index bac9d01..083c0d4 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -349,9 +349,6 @@ public static ConfVars getMetaConf(String name) { CATALOG_DEFAULT("metastore.catalog.default", "metastore.catalog.default", "hive", "The default catalog to use when a catalog is not specified. Default is 'hive' (the " + "default catalog)."), - CATALOGS_TO_CACHE("metastore.cached.rawstore.catalogs", "metastore.cached.rawstore.catalogs", - "hive", "Comma separated list of catalogs to cache in the CachedStore. Default is 'hive' " + - "(the default catalog). Empty string means all catalogs will be cached."), CLIENT_CONNECT_RETRY_DELAY("metastore.client.connect.retry.delay", "hive.metastore.client.connect.retry.delay", 1, TimeUnit.SECONDS, "Number of seconds for the client to wait between consecutive connection attempts"), @@ -975,8 +972,6 @@ public static ConfVars getMetaConf(String name) { "Time interval describing how often the reaper runs"), TOKEN_SIGNATURE("metastore.token.signature", "hive.metastore.token.signature", "", "The delegation token service name to match when selecting a token from the current user's tokens."), - METASTORE_CACHE_CAN_USE_EVENT("metastore.cache.can.use.event", "hive.metastore.cache.can.use.event", false, - "Can notification events from notification log table be used for updating the metastore cache."), TRANSACTIONAL_EVENT_LISTENERS("metastore.transactional.event.listeners", "hive.metastore.transactional.event.listeners", "", "A comma separated list of Java classes that implement the org.apache.riven.MetaStoreEventListener" + diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 1a694fb..dbb46d2 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -583,19 +583,6 @@ public void init() throws MetaException { listeners.add(new HMSMetricsListener(conf)); } - boolean canCachedStoreCanUseEvent = false; - for (MetaStoreEventListener listener : transactionalListeners) { - if (listener.doesAddEventsToNotificationLogTable()) { - canCachedStoreCanUseEvent = true; - break; - } - } - if (conf.getBoolean(ConfVars.METASTORE_CACHE_CAN_USE_EVENT.getVarname(), false) && - !canCachedStoreCanUseEvent) { - throw new MetaException("CahcedStore can not use events for invalidation as there is no " + - " TransactionalMetaStoreEventListener to add events to notification table"); - } - endFunctionListeners = MetaStoreServerUtils.getMetaStoreListeners( MetaStoreEndFunctionListener.class, conf, MetastoreConf.getVar(conf, ConfVars.END_FUNCTION_LISTENERS)); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index a5d0c04..28e6ec2 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -121,13 +121,17 @@ private static boolean areTxnStatsSupported; private PartitionExpressionProxy expressionProxy = null; private static SharedCache sharedCache = new SharedCache(); - private static boolean canUseEvents = false; private static long lastEventId; static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName()); @Override public void setConf(Configuration conf) { + if (conf.get(ConfVars.TRANSACTIONAL_EVENT_LISTENERS.getVarname())==null || + conf.get(ConfVars.TRANSACTIONAL_EVENT_LISTENERS.getVarname()).isEmpty()) { + throw new RuntimeException("CahcedStore can not use events for invalidation as there is no " + + " TransactionalMetaStoreEventListener to add events to notification table"); + } setConfInternal(conf); initBlackListWhiteList(conf); initSharedCache(conf); @@ -170,13 +174,6 @@ synchronized private static void triggerPreWarm(RawStore rawStore) { } private void setConfInternal(Configuration conf) { - if (MetastoreConf.getBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT)) { - canUseEvents = true; - } else { - canUseEvents = false; - } - LOG.info("canUseEvents is set to " + canUseEvents + " in cached Store"); - String rawStoreClassName = MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName()); if (rawStore == null) { @@ -417,15 +414,9 @@ static void prewarm(RawStore rawStore) { Deadline.registerIfNot(1000000); Collection catalogsToCache; try { - catalogsToCache = catalogsToCache(rawStore); - LOG.info("Going to cache catalogs: " + org.apache.commons.lang.StringUtils.join(catalogsToCache, ", ")); - List catalogs = new ArrayList<>(catalogsToCache.size()); - for (String catName : catalogsToCache) { - catalogs.add(rawStore.getCatalog(catName)); - } - sharedCache.populateCatalogsInCache(catalogs); - } catch (MetaException | NoSuchObjectException e) { - LOG.warn("Failed to populate catalogs in cache, going to try again", e); + catalogsToCache = rawStore.getCatalogs(); + } catch (MetaException e) { + LOG.warn("Failed to get catalogs, going to try again", e); try { Thread.sleep(sleepTime); sleepTime = sleepTime * 2; @@ -435,7 +426,7 @@ static void prewarm(RawStore rawStore) { // try again continue; } - LOG.info("Finished prewarming catalogs, starting on databases"); + LOG.info("Finished getting catalogs, starting cache databases"); List databases = new ArrayList<>(); for (String catName : catalogsToCache) { try { @@ -617,17 +608,6 @@ private static void initBlackListWhiteList(Configuration conf) { MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST)); } - private static Collection catalogsToCache(RawStore rs) throws MetaException { - Collection confValue = - MetastoreConf.getStringCollection(rs.getConf(), ConfVars.CATALOGS_TO_CACHE); - if (confValue == null || confValue.isEmpty() || - (confValue.size() == 1 && confValue.contains(""))) { - return rs.getCatalogs(); - } else { - return confValue; - } - } - @VisibleForTesting /** * This starts a background thread, which initially populates the SharedCache and later @@ -711,19 +691,10 @@ static void setCacheRefreshPeriod(long time) { @Override public void run() { if (!shouldRunPrewarm) { - if (canUseEvents) { - try { - triggerUpdateUsingEvent(rawStore); - } catch (Exception e) { - LOG.error("failed to update cache using events ", e); - } - } else { - // TODO: prewarm and update can probably be merged. - try { - update(); - } catch (Exception e) { - LOG.error("periodical refresh fail ", e); - } + try { + triggerUpdateUsingEvent(rawStore); + } catch (Exception e) { + LOG.error("failed to update cache using events ", e); } } else { try { @@ -736,222 +707,6 @@ public void run() { } } - void update() { - Deadline.registerIfNot(1000000); - LOG.debug("CachedStore: updating cached objects. Shared cache has been update {} times so far.", - sharedCache.getUpdateCount()); - try { - for (String catName : catalogsToCache(rawStore)) { - List dbNames = rawStore.getAllDatabases(catName); - // Update the database in cache - updateDatabases(rawStore, catName, dbNames); - for (String dbName : dbNames) { - // Update the tables in cache - updateTables(rawStore, catName, dbName); - List tblNames; - try { - tblNames = rawStore.getAllTables(catName, dbName); - } catch (MetaException e) { - LOG.debug(ExceptionUtils.getStackTrace(e)); - // Continue with next database - continue; - } - for (String tblName : tblNames) { - if (!shouldCacheTable(catName, dbName, tblName)) { - continue; - } - // Update the table column stats for a table in cache - updateTableColStats(rawStore, catName, dbName, tblName); - // Update the partitions for a table in cache - updateTablePartitions(rawStore, catName, dbName, tblName); - // Update the partition col stats for a table in cache - updateTablePartitionColStats(rawStore, catName, dbName, tblName); - // Update aggregate partition column stats for a table in cache - updateTableAggregatePartitionColStats(rawStore, catName, dbName, tblName); - } - } - } - sharedCache.incrementUpdateCount(); - LOG.debug("CachedStore: updated cached objects. Shared cache update count is: {}", - sharedCache.getUpdateCount()); - } catch (MetaException e) { - LOG.error("Updating CachedStore: error happen when refresh; skipping this iteration", e); - } - } - - private void updateDatabases(RawStore rawStore, String catName, List dbNames) { - LOG.debug("CachedStore: updating cached database objects for catalog: {}", catName); - boolean success = false; - // Try MAX_RETRIES times, then move to next method - int maxTries = MAX_RETRIES; - while (!success && (maxTries-- > 0)) { - // Prepare the list of databases - List databases = new ArrayList<>(); - for (String dbName : dbNames) { - Database db; - try { - db = rawStore.getDatabase(catName, dbName); - databases.add(db); - } catch (NoSuchObjectException e) { - LOG.info("Updating CachedStore: database: " + catName + "." + dbName + " does not exist.", e); - } - } - success = sharedCache.refreshDatabasesInCache(databases); - LOG.debug("CachedStore: updated cached database objects for catalog: {}", catName); - } - } - - private void updateTables(RawStore rawStore, String catName, String dbName) { - LOG.debug("CachedStore: updating cached table objects for catalog: {}, database: {}", catName, dbName); - boolean success = false; - // Try MAX_RETRIES times, then move to next method - int maxTries = MAX_RETRIES; - while (!success && (maxTries-- > 0)) { - List tables = new ArrayList<>(); - try { - List tblNames = rawStore.getAllTables(catName, dbName); - for (String tblName : tblNames) { - if (!shouldCacheTable(catName, dbName, tblName)) { - continue; - } - Table table = rawStore.getTable(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName)); - tables.add(table); - } - success = sharedCache.refreshTablesInCache(catName, dbName, tables); - LOG.debug("CachedStore: updated cached table objects for catalog: {}, database: {}", catName, dbName); - } catch (MetaException e) { - LOG.debug("Unable to refresh cached tables for database: " + dbName, e); - } - } - } - - private void updateTableColStats(RawStore rawStore, String catName, String dbName, String tblName) { - LOG.debug("CachedStore: updating cached table col stats objects for catalog: {}, database: {}", catName, dbName); - boolean committed = false; - rawStore.openTransaction(); - try { - Table table = rawStore.getTable(catName, dbName, tblName); - if (table != null && !table.isSetPartitionKeys()) { - List colNames = MetaStoreUtils.getColumnNamesForTable(table); - Deadline.startTimer("getTableColumnStatistics"); - ColumnStatistics tableColStats = rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames); - Deadline.stopTimer(); - if (tableColStats != null) { - sharedCache.refreshTableColStatsInCache(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), - tableColStats.getStatsObj()); - // Update the table to get consistent stats state. - sharedCache.alterTableInCache(catName, dbName, tblName, table); - } - } - committed = rawStore.commitTransaction(); - LOG.debug("CachedStore: updated cached table col stats objects for catalog: {}, database: {}", catName, dbName); - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Unable to refresh table column stats for table: " + tblName, e); - } finally { - if (!committed) { - sharedCache.removeAllTableColStatsFromCache(catName, dbName, tblName); - rawStore.rollbackTransaction(); - } - } - } - - private void updateTablePartitions(RawStore rawStore, String catName, String dbName, String tblName) { - LOG.debug("CachedStore: updating cached partition objects for catalog: {}, database: {}, table: {}", catName, - dbName, tblName); - try { - Deadline.startTimer("getPartitions"); - List partitions = rawStore.getPartitions(catName, dbName, tblName, Integer.MAX_VALUE); - Deadline.stopTimer(); - sharedCache.refreshPartitionsInCache(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), partitions); - LOG.debug("CachedStore: updated cached partition objects for catalog: {}, database: {}, table: {}", catName, - dbName, tblName); - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); - } - } - - private void updateTablePartitionColStats(RawStore rawStore, String catName, String dbName, String tblName) { - LOG.debug("CachedStore: updating cached partition col stats objects for catalog: {}, database: {}, table: {}", - catName, dbName, tblName); - boolean committed = false; - rawStore.openTransaction(); - try { - Table table = rawStore.getTable(catName, dbName, tblName); - if (table != null) { - List colNames = MetaStoreUtils.getColumnNamesForTable(table); - List partNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1); - // Get partition column stats for this table - Deadline.startTimer("getPartitionColumnStatistics"); - List partitionColStats = - rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames); - Deadline.stopTimer(); - sharedCache.refreshPartitionColStatsInCache(catName, dbName, tblName, partitionColStats); - Deadline.startTimer("getPartitionsByNames"); - List parts = rawStore.getPartitionsByNames(catName, dbName, tblName, partNames); - Deadline.stopTimer(); - // Also save partitions for consistency as they have the stats state. - for (Partition part : parts) { - sharedCache.alterPartitionInCache(catName, dbName, tblName, part.getValues(), part); - } - } - committed = rawStore.commitTransaction(); - LOG.debug("CachedStore: updated cached partition col stats objects for catalog: {}, database: {}, table: {}", - catName, dbName, tblName); - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); - } finally { - if (!committed) { - sharedCache.removeAllPartitionColStatsFromCache(catName, dbName, tblName); - rawStore.rollbackTransaction(); - } - } - } - - // Update cached aggregate stats for all partitions of a table and for all - // but default partition - private static void updateTableAggregatePartitionColStats(RawStore rawStore, String catName, String dbName, - String tblName) { - LOG.debug("CachedStore: updating cached aggregate partition col stats objects for catalog: {}, database: {}, table: {}", - catName, dbName, tblName); - try { - Table table = rawStore.getTable(catName, dbName, tblName); - if (table == null) { - return; - } - List partNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1); - List colNames = MetaStoreUtils.getColumnNamesForTable(table); - if ((partNames != null) && (partNames.size() > 0)) { - Deadline.startTimer("getAggregareStatsForAllPartitions"); - AggrStats aggrStatsAllPartitions = rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); - Deadline.stopTimer(); - // Remove default partition from partition names and get aggregate stats again - List partKeys = table.getPartitionKeys(); - String defaultPartitionValue = MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); - List partCols = new ArrayList(); - List partVals = new ArrayList(); - for (FieldSchema fs : partKeys) { - partCols.add(fs.getName()); - partVals.add(defaultPartitionValue); - } - String defaultPartitionName = FileUtils.makePartName(partCols, partVals); - partNames.remove(defaultPartitionName); - Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault"); - AggrStats aggrStatsAllButDefaultPartition = - rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); - Deadline.stopTimer(); - sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions, - aggrStatsAllButDefaultPartition, null); - LOG.debug("CachedStore: updated cached aggregate partition col stats objects for catalog: {}, database: {}, table: {}", - catName, dbName, tblName); - } - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName, e); - } - } } @Override @@ -971,27 +726,7 @@ public boolean openTransaction() { @Override public boolean commitTransaction() { - if (!rawStore.commitTransaction()) { - return false; - } - - // In case of event based update, shared cache is not updated directly to avoid inconsistency. - // For example, if metastore B add a partition, then metastore A drop a partition later. However, on metastore A, - // it first get drop partition request, then from notification, create the partition. If there's no tombstone - // entry in partition cache to tell drop is after creation, we end up consumes the creation request. Though - // eventually there's drop partition notification, but during the interim, later event takes precedence. - // So we will not update the cache during raw store operation but wait during commit transaction to make sure that - // the event related to the current transactions are updated in the cache and thus we can support strong - // consistency in case there is only one metastore. - if (canUseEvents) { - try { - triggerUpdateUsingEvent(rawStore); - } catch (Exception e) { - //TODO : Not sure how to handle it as the commit is already done in the object store. - LOG.error("Failed to update cache", e); - } - } - return true; + return rawStore.commitTransaction(); } @Override @@ -1007,70 +742,38 @@ public void rollbackTransaction() { @Override public void createCatalog(Catalog cat) throws MetaException { rawStore.createCatalog(cat); - // in case of event based cache update, cache will not be updated for catalog. - if (!canUseEvents) { - sharedCache.addCatalogToCache(cat); - } } @Override public void alterCatalog(String catName, Catalog cat) throws MetaException, InvalidOperationException { rawStore.alterCatalog(catName, cat); - // in case of event based cache update, cache will not be updated for catalog. - if (!canUseEvents) { - sharedCache.alterCatalogInCache(StringUtils.normalizeIdentifier(catName), cat); - } } @Override public Catalog getCatalog(String catalogName) throws NoSuchObjectException, MetaException { - // in case of event based cache update, cache will not be updated for catalog. - if (!sharedCache.isCatalogCachePrewarmed() || canUseEvents) { - return rawStore.getCatalog(catalogName); - } - Catalog cat = sharedCache.getCatalogFromCache(normalizeIdentifier(catalogName)); - if (cat == null) { - throw new NoSuchObjectException(); - } - return cat; + return rawStore.getCatalog(catalogName); } @Override public List getCatalogs() throws MetaException { - // in case of event based cache update, cache will not be updated for catalog. - if (!sharedCache.isCatalogCachePrewarmed() || canUseEvents) { - return rawStore.getCatalogs(); - } - return sharedCache.listCachedCatalogs(); + return rawStore.getCatalogs(); } @Override public void dropCatalog(String catalogName) throws NoSuchObjectException, MetaException { rawStore.dropCatalog(catalogName); - - // in case of event based cache update, cache will not be updated for catalog. - if (!canUseEvents) { - catalogName = catalogName.toLowerCase(); - sharedCache.removeCatalogFromCache(catalogName); - } } @Override public void createDatabase(Database db) throws InvalidObjectException, MetaException { rawStore.createDatabase(db); - // in case of event based cache update, cache will be updated during commit. - if (!canUseEvents) { - sharedCache.addDatabaseToCache(db); - } + sharedCache.addDatabaseToCache(db); } @Override public Database getDatabase(String catName, String dbName) throws NoSuchObjectException { - // in case of event based cache update, cache will be updated during commit. So within active transaction, read - // directly from rawStore to avoid reading stale data as the data updated during same transaction will not be - // updated in the cache. - if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) { + if (!sharedCache.isDatabaseCachePrewarmed()) { return rawStore.getDatabase(catName, dbName); } dbName = dbName.toLowerCase(); @@ -1085,7 +788,7 @@ public Database getDatabase(String catName, String dbName) throws NoSuchObjectEx @Override public boolean dropDatabase(String catName, String dbName) throws NoSuchObjectException, MetaException { boolean succ = rawStore.dropDatabase(catName, dbName); - if (succ && !canUseEvents) { + if (succ) { // in case of event based cache update, cache will be updated during commit. sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName)); @@ -1097,7 +800,7 @@ public boolean dropDatabase(String catName, String dbName) throws NoSuchObjectEx public boolean alterDatabase(String catName, String dbName, Database db) throws NoSuchObjectException, MetaException { boolean succ = rawStore.alterDatabase(catName, dbName, db); - if (succ && !canUseEvents) { + if (succ) { // in case of event based cache update, cache will be updated during commit. sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName), db); @@ -1107,7 +810,7 @@ public boolean alterDatabase(String catName, String dbName, Database db) @Override public List getDatabases(String catName, String pattern) throws MetaException { - if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) { + if (!sharedCache.isDatabaseCachePrewarmed()) { return rawStore.getDatabases(catName, pattern); } return sharedCache.listCachedDatabases(catName, pattern); @@ -1115,7 +818,7 @@ public boolean alterDatabase(String catName, String dbName, Database db) @Override public List getAllDatabases(String catName) throws MetaException { - if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) { + if (!sharedCache.isDatabaseCachePrewarmed()) { return rawStore.getAllDatabases(catName); } return sharedCache.listCachedDatabases(catName); @@ -1157,10 +860,6 @@ private void validateTableType(Table tbl) { @Override public void createTable(Table tbl) throws InvalidObjectException, MetaException { rawStore.createTable(tbl); - // in case of event based cache update, cache will be updated during commit. - if (canUseEvents) { - return; - } String catName = normalizeIdentifier(tbl.getCatName()); String dbName = normalizeIdentifier(tbl.getDbName()); String tblName = normalizeIdentifier(tbl.getTableName()); @@ -1176,7 +875,7 @@ public boolean dropTable(String catName, String dbName, String tblName) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.dropTable(catName, dbName, tblName); // in case of event based cache update, cache will be updated during commit. - if (succ && !canUseEvents) { + if (succ) { catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); @@ -1198,7 +897,7 @@ public Table getTable(String catName, String dbName, String tblName, String vali catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName)) { return rawStore.getTable(catName, dbName, tblName, validWriteIds); } Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1240,8 +939,7 @@ public Table getTable(String catName, String dbName, String tblName, String vali @Override public boolean addPartition(Partition part) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartition(part); - // in case of event based cache update, cache will be updated during commit. - if (succ && !canUseEvents) { + if (succ) { String dbName = normalizeIdentifier(part.getDbName()); String tblName = normalizeIdentifier(part.getTableName()); String catName = part.isSetCatName() ? normalizeIdentifier(part.getCatName()) : DEFAULT_CATALOG_NAME; @@ -1257,8 +955,7 @@ public boolean addPartition(Partition part) throws InvalidObjectException, MetaE public boolean addPartitions(String catName, String dbName, String tblName, List parts) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(catName, dbName, tblName, parts); - // in case of event based cache update, cache will be updated during commit. - if (succ && !canUseEvents) { + if (succ) { catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); @@ -1274,8 +971,7 @@ public boolean addPartitions(String catName, String dbName, String tblName, List public boolean addPartitions(String catName, String dbName, String tblName, PartitionSpecProxy partitionSpec, boolean ifNotExists) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(catName, dbName, tblName, partitionSpec, ifNotExists); - // in case of event based cache update, cache will be updated during commit. - if (succ && !canUseEvents) { + if (succ) { catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); @@ -1304,7 +1000,7 @@ public Partition getPartition(String catName, String dbName, String tblName, catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName)) { return rawStore.getPartition( catName, dbName, tblName, part_vals, validWriteIds); } @@ -1335,7 +1031,7 @@ public boolean doesPartitionExist(String catName, String dbName, String tblName, catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName)) { return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, part_vals); } Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1350,8 +1046,7 @@ public boolean doesPartitionExist(String catName, String dbName, String tblName, public boolean dropPartition(String catName, String dbName, String tblName, List part_vals) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.dropPartition(catName, dbName, tblName, part_vals); - // in case of event based cache update, cache will be updated during commit. - if (succ && !canUseEvents) { + if (succ) { catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); @@ -1367,10 +1062,6 @@ public boolean dropPartition(String catName, String dbName, String tblName, List public void dropPartitions(String catName, String dbName, String tblName, List partNames) throws MetaException, NoSuchObjectException { rawStore.dropPartitions(catName, dbName, tblName, partNames); - // in case of event based cache update, cache will be updated during commit. - if (canUseEvents) { - return; - } catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); @@ -1390,7 +1081,7 @@ public void dropPartitions(String catName, String dbName, String tblName, List getTables(String catName, String dbName, String pattern) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() || - (canUseEvents && rawStore.isActiveTransaction())) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get()) { return rawStore.getTables(catName, dbName, pattern); } return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), @@ -1461,8 +1147,7 @@ public void updateCreationMetadata(String catName, String dbname, String tablena @Override public List getTables(String catName, String dbName, String pattern, TableType tableType) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()|| !isCachedAllMetadata.get() - || (canUseEvents && rawStore.isActiveTransaction())) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()|| !isCachedAllMetadata.get()) { return rawStore.getTables(catName, dbName, pattern, tableType); } return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), @@ -1485,8 +1170,7 @@ public void updateCreationMetadata(String catName, String dbname, String tablena public List getTableMeta(String catName, String dbNames, String tableNames, List tableTypes) throws MetaException { // TODO Check if all required tables are allowed, if so, get it from cache - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() || - (canUseEvents && rawStore.isActiveTransaction())) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get()) { return rawStore.getTableMeta(catName, dbNames, tableNames, tableTypes); } return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(catName), @@ -1497,9 +1181,6 @@ public void updateCreationMetadata(String catName, String dbname, String tablena @Override public List
getTableObjectsByName(String catName, String dbName, List tblNames) throws MetaException, UnknownDBException { - if (canUseEvents && rawStore.isActiveTransaction()) { - return rawStore.getTableObjectsByName(catName, dbName, tblNames); - } dbName = normalizeIdentifier(dbName); catName = normalizeIdentifier(catName); boolean missSomeInCache = false; @@ -1534,8 +1215,7 @@ public void updateCreationMetadata(String catName, String dbname, String tablena @Override public List getAllTables(String catName, String dbName) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() || - (canUseEvents && rawStore.isActiveTransaction())) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get()) { return rawStore.getAllTables(catName, dbName); } return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), @@ -1555,7 +1235,7 @@ public void updateCreationMetadata(String catName, String dbname, String tablena catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName)) { return rawStore.listPartitionNames(catName, dbName, tblName, max_parts); } Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1585,10 +1265,6 @@ public Partition alterPartition(String catName, String dbName, String tblName, List partVals, Partition newPart, String validWriteIds) throws InvalidObjectException, MetaException { newPart = rawStore.alterPartition(catName, dbName, tblName, partVals, newPart, validWriteIds); - // in case of event based cache update, cache will be updated during commit. - if (canUseEvents) { - return newPart; - } catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); @@ -1606,10 +1282,6 @@ public Partition alterPartition(String catName, String dbName, String tblName, throws InvalidObjectException, MetaException { newParts = rawStore.alterPartitions( catName, dbName, tblName, partValsList, newParts, writeId, validWriteIds); - // in case of event based cache update, cache will be updated during commit. - if (canUseEvents) { - return newParts; - } catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); @@ -1661,7 +1333,7 @@ public boolean getPartitionsByExpr(String catName, String dbName, String tblName catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName)) { return rawStore.getPartitionsByExpr(catName, dbName, tblName, expr, defaultPartitionName, maxParts, result); } List partNames = new LinkedList<>(); @@ -1692,7 +1364,7 @@ public int getNumPartitionsByExpr(String catName, String dbName, String tblName, catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName)) { return rawStore.getNumPartitionsByExpr(catName, dbName, tblName, expr); } String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); @@ -1726,7 +1398,7 @@ public int getNumPartitionsByExpr(String catName, String dbName, String tblName, catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName)) { return rawStore.getPartitionsByNames(catName, dbName, tblName, partNames); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1913,7 +1585,7 @@ public Partition getPartitionWithAuth(String catName, String dbName, String tblN catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName)) { return rawStore.getPartitionWithAuth(catName, dbName, tblName, partVals, userName, groupNames); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1940,7 +1612,7 @@ public Partition getPartitionWithAuth(String catName, String dbName, String tblN catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName)) { return rawStore.getPartitionsWithAuth(catName, dbName, tblName, maxParts, userName, groupNames); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1969,7 +1641,7 @@ public Partition getPartitionWithAuth(String catName, String dbName, String tblN catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName)) { return rawStore.listPartitionNamesPs(catName, dbName, tblName, partSpecs, maxParts); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1998,7 +1670,7 @@ public Partition getPartitionWithAuth(String catName, String dbName, String tblN catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName)) { return rawStore.listPartitionsPsWithAuth(catName, dbName, tblName, partSpecs, maxParts, userName, groupNames); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -2128,8 +1800,7 @@ private static void updateTableColumnsStatsInternal(Configuration conf, ColumnSt throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { Map newParams = rawStore.updateTableColumnStatistics( colStats, validWriteIds, writeId); - // in case of event based cache update, cache will be updated during commit. - if (newParams != null && !canUseEvents) { + if (newParams != null) { updateTableColumnsStatsInternal(conf, colStats, newParams, null, writeId); } return newParams; @@ -2174,16 +1845,13 @@ public boolean deleteTableColumnStatistics(String catName, String dbName, String String colName) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.deleteTableColumnStatistics(catName, dbName, tblName, colName); - // in case of event based cache update, cache is updated during commit txn - if (succ && !canUseEvents) { - catName = normalizeIdentifier(catName); - dbName = normalizeIdentifier(dbName); - tblName = normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { - return succ; - } - sharedCache.removeTableColStatsFromCache(catName, dbName, tblName, colName); + catName = normalizeIdentifier(catName); + dbName = normalizeIdentifier(dbName); + tblName = normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return succ; } + sharedCache.removeTableColStatsFromCache(catName, dbName, tblName, colName); return succ; } @@ -2193,20 +1861,17 @@ public boolean deleteTableColumnStatistics(String catName, String dbName, String throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { Map newParams = rawStore.updatePartitionColumnStatistics( colStats, partVals, validWriteIds, writeId); - // in case of event based cache update, cache is updated during commit txn - if (newParams != null && !canUseEvents) { - String catName = colStats.getStatsDesc().isSetCatName() ? - normalizeIdentifier(colStats.getStatsDesc().getCatName()) : DEFAULT_CATALOG_NAME; - String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName()); - String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName()); - if (!shouldCacheTable(catName, dbName, tblName)) { - return newParams; - } - Partition part = getPartition(catName, dbName, tblName, partVals); - part.setParameters(newParams); - sharedCache.alterPartitionInCache(catName, dbName, tblName, partVals, part); - sharedCache.updatePartitionColStatsInCache(catName, dbName, tblName, partVals, colStats.getStatsObj()); + String catName = colStats.getStatsDesc().isSetCatName() ? + normalizeIdentifier(colStats.getStatsDesc().getCatName()) : DEFAULT_CATALOG_NAME; + String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName()); + String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName()); + if (!shouldCacheTable(catName, dbName, tblName)) { + return newParams; } + Partition part = getPartition(catName, dbName, tblName, partVals); + part.setParameters(newParams); + sharedCache.alterPartitionInCache(catName, dbName, tblName, partVals, part); + sharedCache.updatePartitionColStatsInCache(catName, dbName, tblName, partVals, colStats.getStatsObj()); return newParams; } @@ -2240,16 +1905,13 @@ public boolean deletePartitionColumnStatistics(String catName, String dbName, St throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.deletePartitionColumnStatistics(catName, dbName, tblName, partName, partVals, colName); - // in case of event based cache update, cache is updated during commit txn. - if (succ && !canUseEvents) { - catName = normalizeIdentifier(catName); - dbName = normalizeIdentifier(dbName); - tblName = normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { - return succ; - } - sharedCache.removePartitionColStatsFromCache(catName, dbName, tblName, partVals, colName); + catName = normalizeIdentifier(catName); + dbName = normalizeIdentifier(dbName); + tblName = normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return succ; } + sharedCache.removePartitionColStatsFromCache(catName, dbName, tblName, partVals, colName); return succ; } @@ -2271,7 +1933,7 @@ public AggrStats get_aggr_stats_for(String catName, String dbName, String tblNam // TODO: we currently cannot do transactional checks for stats here // (incl. due to lack of sync w.r.t. the below rawStore call). // In case the cache is updated using events, aggregate is calculated locally and thus can be read from cache. - if (!shouldCacheTable(catName, dbName, tblName) || (writeIdList != null && !canUseEvents)) { + if (!shouldCacheTable(catName, dbName, tblName) || writeIdList != null) { return rawStore.get_aggr_stats_for( catName, dbName, tblName, partNames, colNames, writeIdList); } @@ -2379,18 +2041,16 @@ private MergedColumnStatsForPartitions mergeColStatsForPartitions( List colAggrStats = MetaStoreServerUtils.aggrPartitionStats(colStatsMap, partNames, partsFound == partNames.size(), useDensityFunctionForNDVEstimation, ndvTuner); - if (canUseEvents) { - if (type == StatsType.ALL) { - sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), new AggrStats(colAggrStats, partsFound), - null, partNameToWriteId); - } else if (type == StatsType.ALLBUTDEFAULT) { - sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), null, - new AggrStats(colAggrStats, partsFound), partNameToWriteId); - } + if (type == StatsType.ALL) { + sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), new AggrStats(colAggrStats, partsFound), + null, partNameToWriteId); + } else if (type == StatsType.ALLBUTDEFAULT) { + sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), null, + new AggrStats(colAggrStats, partsFound), partNameToWriteId); } return new MergedColumnStatsForPartitions(colAggrStats, partsFound); } @@ -2706,10 +2366,6 @@ public int getDatabaseCount() throws MetaException { // TODO constraintCache List constraintNames = rawStore.createTableWithConstraints(tbl, primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints); - // in case of event based cache update, cache is updated during commit. - if (canUseEvents) { - return constraintNames; - } String dbName = normalizeIdentifier(tbl.getDbName()); String tblName = normalizeIdentifier(tbl.getTableName()); String catName = tbl.isSetCatName() ? normalizeIdentifier(tbl.getCatName()) : @@ -3057,12 +2713,6 @@ static boolean isBlacklistWhitelistEmpty(Configuration conf) { && MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST).isEmpty(); } - @VisibleForTesting - void resetCatalogCache() { - sharedCache.resetCatalogCache(); - setCachePrewarmedState(false); - } - @Override public void addRuntimeStat(RuntimeStat stat) throws MetaException { rawStore.addRuntimeStat(stat); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index 05cf70b..323b41c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -67,10 +67,6 @@ public class SharedCache { private static ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(true); - private boolean isCatalogCachePrewarmed = false; - private Map catalogCache = new TreeMap<>(); - private HashSet catalogsDeletedDuringPrewarm = new HashSet<>(); - private AtomicBoolean isCatalogCacheDirty = new AtomicBoolean(false); // For caching Database objects. Key is database name private Map databaseCache = new TreeMap<>(); @@ -386,32 +382,6 @@ public void alterPartitions(List> partValsList, List new } } - public void refreshPartitions(List partitions, SharedCache sharedCache) { - Map newPartitionCache = new HashMap(); - try { - tableLock.writeLock().lock(); - for (Partition part : partitions) { - if (isPartitionCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping partition cache update for table: " + getTable().getTableName() - + "; the partition list we have is dirty."); - return; - } - String key = CacheUtils.buildPartitionCacheKey(part.getValues()); - PartitionWrapper wrapper = partitionCache.get(key); - if (wrapper != null) { - if (wrapper.getSdHash() != null) { - sharedCache.decrSd(wrapper.getSdHash()); - } - } - wrapper = makePartitionWrapper(part, sharedCache); - newPartitionCache.put(key, wrapper); - } - partitionCache = newPartitionCache; - } finally { - tableLock.writeLock().unlock(); - } - } - public boolean updateTableColStats(List colStatsForTable) { try { tableLock.writeLock().lock(); @@ -454,27 +424,6 @@ public boolean updateTableColStats(List colStatsForTable) { } } - public void refreshTableColStats(List colStatsForTable) { - Map newTableColStatsCache = - new HashMap(); - try { - tableLock.writeLock().lock(); - for (ColumnStatisticsObj colStatObj : colStatsForTable) { - if (isTableColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping table col stats cache update for table: " - + getTable().getTableName() + "; the table col stats list we have is dirty."); - return; - } - String key = colStatObj.getColName(); - // TODO: get rid of deepCopy after making sure callers don't use references - newTableColStatsCache.put(key, colStatObj.deepCopy()); - } - tableColStatsCache = newTableColStatsCache; - } finally { - tableLock.writeLock().unlock(); - } - } - public ColumnStatistics getCachedTableColStats(ColumnStatisticsDesc csd, List colNames, String validWriteIds, boolean areTxnStatsSupported) throws MetaException { @@ -675,42 +624,6 @@ public void removeAllPartitionColStats() { } } - public void refreshPartitionColStats(List partitionColStats) { - Map newPartitionColStatsCache = - new HashMap(); - try { - tableLock.writeLock().lock(); - String tableName = StringUtils.normalizeIdentifier(getTable().getTableName()); - for (ColumnStatistics cs : partitionColStats) { - if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping partition column stats cache update for table: " - + getTable().getTableName() + "; the partition column stats list we have is dirty"); - return; - } - List partVal; - try { - partVal = Warehouse.makeValsFromName(cs.getStatsDesc().getPartName(), null); - List colStatsObjs = cs.getStatsObj(); - for (ColumnStatisticsObj colStatObj : colStatsObjs) { - if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping partition column stats cache update for table: " - + getTable().getTableName() + "; the partition column list we have is dirty"); - return; - } - String key = - CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName()); - newPartitionColStatsCache.put(key, colStatObj.deepCopy()); - } - } catch (MetaException e) { - LOG.debug("Unable to cache partition column stats for table: " + tableName, e); - } - } - partitionColStatsCache = newPartitionColStatsCache; - } finally { - tableLock.writeLock().unlock(); - } - } - public List getAggrPartitionColStats(List colNames, StatsType statsType) { List colStats = new ArrayList(); @@ -935,95 +848,6 @@ public ColumnStatisticsObj getColumnStatisticsObj() { } } - public void populateCatalogsInCache(Collection catalogs) { - for (Catalog cat : catalogs) { - Catalog catCopy = cat.deepCopy(); - // ObjectStore also stores db name in lowercase - catCopy.setName(catCopy.getName().toLowerCase()); - try { - cacheLock.writeLock().lock(); - // Since we allow write operations on cache while prewarm is happening: - // 1. Don't add databases that were deleted while we were preparing list for prewarm - // 2. Skip overwriting exisiting db object - // (which is present because it was added after prewarm started) - if (catalogsDeletedDuringPrewarm.contains(catCopy.getName())) { - continue; - } - catalogCache.putIfAbsent(catCopy.getName(), catCopy); - catalogsDeletedDuringPrewarm.clear(); - isCatalogCachePrewarmed = true; - } finally { - cacheLock.writeLock().unlock(); - } - } - } - - public Catalog getCatalogFromCache(String name) { - Catalog cat = null; - try { - cacheLock.readLock().lock(); - if (catalogCache.get(name) != null) { - cat = catalogCache.get(name).deepCopy(); - } - } finally { - cacheLock.readLock().unlock(); - } - return cat; - } - - public void addCatalogToCache(Catalog cat) { - try { - cacheLock.writeLock().lock(); - Catalog catCopy = cat.deepCopy(); - // ObjectStore also stores db name in lowercase - catCopy.setName(catCopy.getName().toLowerCase()); - catalogCache.put(cat.getName(), catCopy); - isCatalogCacheDirty.set(true); - } finally { - cacheLock.writeLock().unlock(); - } - } - - public void alterCatalogInCache(String catName, Catalog newCat) { - try { - cacheLock.writeLock().lock(); - removeCatalogFromCache(catName); - addCatalogToCache(newCat.deepCopy()); - } finally { - cacheLock.writeLock().unlock(); - } - } - - public void removeCatalogFromCache(String name) { - name = normalizeIdentifier(name); - try { - cacheLock.writeLock().lock(); - // If db cache is not yet prewarmed, add this to a set which the prewarm thread can check - // so that the prewarm thread does not add it back - if (!isCatalogCachePrewarmed) { - catalogsDeletedDuringPrewarm.add(name); - } - if (catalogCache.remove(name) != null) { - isCatalogCacheDirty.set(true); - } - } finally { - cacheLock.writeLock().unlock(); - } - } - - public List listCachedCatalogs() { - try { - cacheLock.readLock().lock(); - return new ArrayList<>(catalogCache.keySet()); - } finally { - cacheLock.readLock().unlock(); - } - } - - public boolean isCatalogCachePrewarmed() { - return isCatalogCachePrewarmed; - } - public Database getDatabaseFromCache(String catName, String name) { Database db = null; try { @@ -1147,23 +971,6 @@ public void alterDatabaseInCache(String catName, String dbName, Database newDb) } } - public boolean refreshDatabasesInCache(List databases) { - if (isDatabaseCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping database cache update; the database list we have is dirty."); - return false; - } - try { - cacheLock.writeLock().lock(); - databaseCache.clear(); - for (Database db : databases) { - addDatabaseToCache(db); - } - return true; - } finally { - cacheLock.writeLock().unlock(); - } - } - public int getCachedDatabaseCount() { try { cacheLock.readLock().lock(); @@ -1445,38 +1252,6 @@ public void alterTableAndStatsInCache(String catName, String dbName, String tblN return tableNames; } - public boolean refreshTablesInCache(String catName, String dbName, List
tables) { - if (isTableCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping table cache update; the table list we have is dirty."); - return false; - } - Map newCacheForDB = new TreeMap<>(); - for (Table tbl : tables) { - String tblName = StringUtils.normalizeIdentifier(tbl.getTableName()); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); - if (tblWrapper != null) { - tblWrapper.updateTableObj(tbl, this); - } else { - tblWrapper = createTableWrapper(catName, dbName, tblName, tbl); - } - newCacheForDB.put(CacheUtils.buildTableKey(catName, dbName, tblName), tblWrapper); - } - try { - cacheLock.writeLock().lock(); - Iterator> entryIterator = tableCache.entrySet().iterator(); - while (entryIterator.hasNext()) { - String key = entryIterator.next().getKey(); - if (key.startsWith(CacheUtils.buildDbKeyWithDelimiterSuffix(catName, dbName))) { - entryIterator.remove(); - } - } - tableCache.putAll(newCacheForDB); - return true; - } finally { - cacheLock.writeLock().unlock(); - } - } - public ColumnStatistics getTableColStatsFromCache(String catName, String dbName, String tblName, List colNames, String validWriteIds, boolean areTxnStatsSupported) throws MetaException { try { @@ -1508,20 +1283,6 @@ public void removeTableColStatsFromCache(String catName, String dbName, String t } } - public void removeAllTableColStatsFromCache(String catName, String dbName, String tblName) { - try { - cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); - if (tblWrapper != null) { - tblWrapper.removeAllTableColStats(); - } else { - LOG.info("Table " + tblName + " is missing from cache."); - } - } finally { - cacheLock.readLock().unlock(); - } - } - public void updateTableColStatsInCache(String catName, String dbName, String tableName, List colStatsForTable) { try { @@ -1538,22 +1299,6 @@ public void updateTableColStatsInCache(String catName, String dbName, String tab } } - public void refreshTableColStatsInCache(String catName, String dbName, String tableName, - List colStatsForTable) { - try { - cacheLock.readLock().lock(); - TableWrapper tblWrapper = - tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName)); - if (tblWrapper != null) { - tblWrapper.refreshTableColStats(colStatsForTable); - } else { - LOG.info("Table " + tableName + " is missing from cache."); - } - } finally { - cacheLock.readLock().unlock(); - } - } - public int getCachedTableCount() { try { cacheLock.readLock().lock(); @@ -1727,19 +1472,6 @@ public void alterPartitionsInCache(String catName, String dbName, String tblName } } - public void refreshPartitionsInCache(String catName, String dbName, String tblName, - List partitions) { - try { - cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); - if (tblWrapper != null) { - tblWrapper.refreshPartitions(partitions, this); - } - } finally { - cacheLock.readLock().unlock(); - } - } - public void removePartitionColStatsFromCache(String catName, String dbName, String tblName, List partVals, String colName) { try { @@ -1753,18 +1485,6 @@ public void removePartitionColStatsFromCache(String catName, String dbName, Stri } } - public void removeAllPartitionColStatsFromCache(String catName, String dbName, String tblName) { - try { - cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); - if (tblWrapper != null) { - tblWrapper.removeAllPartitionColStats(); - } - } finally { - cacheLock.readLock().unlock(); - } - } - public void updatePartitionColStatsInCache(String catName, String dbName, String tableName, List partVals, List colStatsObjs) { try { @@ -1812,19 +1532,6 @@ public ColumStatsWithWriteId getPartitionColStatsFromCache(String catName, Strin return colStatObjs; } - public void refreshPartitionColStatsInCache(String catName, String dbName, String tblName, - List partitionColStats) { - try { - cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); - if (tblWrapper != null) { - tblWrapper.refreshPartitionColStats(partitionColStats); - } - } finally { - cacheLock.readLock().unlock(); - } - } - public List getAggrStatsFromCache(String catName, String dbName, String tblName, List colNames, StatsType statsType) { try { @@ -1909,18 +1616,7 @@ public synchronized StorageDescriptor getSdFromCache(byte[] sdHash) { return sdCache; } - /** - * This resets the contents of the cataog cache so that we can re-fill it in another test. - */ - void resetCatalogCache() { - isCatalogCachePrewarmed = false; - catalogCache.clear(); - catalogsDeletedDuringPrewarm.clear(); - isCatalogCacheDirty.set(false); - } - void clearDirtyFlags() { - isCatalogCacheDirty.set(false); isDatabaseCacheDirty.set(false); isTableCacheDirty.set(false); } diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCatalogCaching.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCatalogCaching.java deleted file mode 100644 index 423dce8..0000000 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCatalogCaching.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.cache; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.HiveMetaStore; -import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; -import org.apache.hadoop.hive.metastore.ObjectStore; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest; -import org.apache.hadoop.hive.metastore.api.Catalog; -import org.apache.hadoop.hive.metastore.api.InvalidOperationException; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.client.builder.CatalogBuilder; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.util.Comparator; -import java.util.List; - -/** - * Tests that catalogs are properly cached. - */ -@Category(MetastoreCheckinTest.class) -public class TestCatalogCaching { - private static final String CAT1_NAME = "cat1"; - private static final String CAT2_NAME = "cat2"; - - private ObjectStore objectStore; - private Configuration conf; - private CachedStore cachedStore; - - @Before - public void createObjectStore() throws MetaException, InvalidOperationException { - conf = MetastoreConf.newMetastoreConf(); - MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); - MetaStoreTestUtils.setConfForStandloneMode(conf); - objectStore = new ObjectStore(); - objectStore.setConf(conf); - - // Create three catalogs - HiveMetaStore.HMSHandler.createDefaultCatalog(objectStore, new Warehouse(conf)); - - Catalog cat1 = new CatalogBuilder() - .setName(CAT1_NAME) - .setLocation("/tmp/cat1") - .build(); - objectStore.createCatalog(cat1); - Catalog cat2 = new CatalogBuilder() - .setName(CAT2_NAME) - .setLocation("/tmp/cat2") - .build(); - objectStore.createCatalog(cat2); - } - - @After - public void clearCatalogCache() throws MetaException, NoSuchObjectException { - List catalogs = objectStore.getCatalogs(); - for (String catalog : catalogs) objectStore.dropCatalog(catalog); - } - - @Test - public void defaultHiveOnly() throws Exception { - // By default just the Hive catalog should be cached. - cachedStore = new CachedStore(); - cachedStore.setConf(conf); - CachedStore.stopCacheUpdateService(1); - cachedStore.resetCatalogCache(); - - CachedStore.prewarm(objectStore); - - // Only the hive catalog should be cached - List cachedCatalogs = cachedStore.getCatalogs(); - Assert.assertEquals(1, cachedCatalogs.size()); - Assert.assertEquals(Warehouse.DEFAULT_CATALOG_NAME, cachedCatalogs.get(0)); - } - - @Test - public void cacheAll() throws Exception { - // Set the config value to empty string, which should result in all catalogs being cached. - Configuration newConf = new Configuration(conf); - MetastoreConf.setVar(newConf, MetastoreConf.ConfVars.CATALOGS_TO_CACHE, ""); - cachedStore = new CachedStore(); - cachedStore.setConf(newConf); - CachedStore.stopCacheUpdateService(1); - objectStore.setConf(newConf); // have to override it with the new conf since this is where - // prewarm gets the conf object - cachedStore.resetCatalogCache(); - - CachedStore.prewarm(objectStore); - - // All the catalogs should be cached - List cachedCatalogs = cachedStore.getCatalogs(); - Assert.assertEquals(3, cachedCatalogs.size()); - cachedCatalogs.sort(Comparator.naturalOrder()); - Assert.assertEquals(CAT1_NAME, cachedCatalogs.get(0)); - Assert.assertEquals(CAT2_NAME, cachedCatalogs.get(1)); - Assert.assertEquals(Warehouse.DEFAULT_CATALOG_NAME, cachedCatalogs.get(2)); - } - - @Test - public void cacheSome() throws Exception { - // Set the config value to 2 catalogs other than hive - Configuration newConf = new Configuration(conf); - MetastoreConf.setVar(newConf, MetastoreConf.ConfVars.CATALOGS_TO_CACHE, CAT1_NAME + "," + CAT2_NAME); - cachedStore = new CachedStore(); - cachedStore.setConf(newConf); - CachedStore.stopCacheUpdateService(1); - objectStore.setConf(newConf); // have to override it with the new conf since this is where - // prewarm gets the conf object - cachedStore.resetCatalogCache(); - - CachedStore.prewarm(objectStore); - - // All the catalogs should be cached - List cachedCatalogs = cachedStore.getCatalogs(); - Assert.assertEquals(2, cachedCatalogs.size()); - cachedCatalogs.sort(Comparator.naturalOrder()); - Assert.assertEquals(CAT1_NAME, cachedCatalogs.get(0)); - Assert.assertEquals(CAT2_NAME, cachedCatalogs.get(1)); - } -}