diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 049e837..9ad354f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1318,7 +1318,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal * @deprecated Use MetastoreConf.RAW_STORE_IMPL */ @Deprecated - METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore", + METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.cache.CachedStore", "Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. \n" + "This class is used to store and retrieval of raw metadata objects such as table, database"), /** diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index e611394..1690413 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -759,7 +759,8 @@ public void onUpdateTableColumnStat(UpdateTableColumnStatEvent updateTableColumn .buildUpdateTableColumnStatMessage(updateTableColumnStatEvent.getColStats(), updateTableColumnStatEvent.getTableObj(), updateTableColumnStatEvent.getTableParameters(), - updateTableColumnStatEvent.getWriteId()); + updateTableColumnStatEvent.getWriteId(), + updateTableColumnStatEvent.getWriteIds()); NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_TABLE_COLUMN_STAT.toString(), msgEncoder.getSerializer().serialize(msg)); ColumnStatisticsDesc statDesc = updateTableColumnStatEvent.getColStats().getStatsDesc(); @@ -789,7 +790,8 @@ public void onUpdatePartitionColumnStat(UpdatePartitionColumnStatEvent updatePar updatePartColStatEvent.getPartVals(), updatePartColStatEvent.getPartParameters(), updatePartColStatEvent.getTableObj(), - updatePartColStatEvent.getWriteId()); + updatePartColStatEvent.getWriteId(), + updatePartColStatEvent.getWriteIds()); NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_PARTITION_COLUMN_STAT.toString(), msgEncoder.getSerializer().serialize(msg)); ColumnStatisticsDesc statDesc = updatePartColStatEvent.getPartColStats().getStatsDesc(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java index cdfc60c..3d27945 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java @@ -10,6 +10,7 @@ import org.apache.hadoop.hive.metastore.*; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.cache.CachedStore.MergedColumnStatsForPartitions; import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder; import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; @@ -44,7 +45,6 @@ public void setUp() throws Exception { MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); MetastoreConf.setVar(conf, ConfVars.TRANSACTIONAL_EVENT_LISTENERS, DbNotificationListener.class.getName()); MetastoreConf.setVar(conf, ConfVars.RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.cache.CachedStore"); - MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT, true); MetastoreConf.setBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED, true); MetastoreConf.setBoolVar(conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED, false); MetaStoreTestUtils.setConfForStandloneMode(conf); @@ -383,8 +383,8 @@ public void testPartitionOpsForUpdateUsingEvents() throws Exception { sharedCache.getSdCache().clear(); } - private void updateTableColStats(String dbName, String tblName, String[] colName, - double highValue, double avgColLen, boolean isTxnTable) throws Throwable { + private long updateTableColStats(String dbName, String tblName, String[] colName, + double highValue, double avgColLen, boolean isTxnTable, long lastEventId) throws Throwable { long writeId = -1; String validWriteIds = null; if (isTxnTable) { @@ -408,6 +408,7 @@ private void updateTableColStats(String dbName, String tblName, String[] colName // write stats objs persistently hmsHandler.update_table_column_statistics_req(setTblColStat); + lastEventId = CachedStore.updateUsingNotificationEvents(rawStore, lastEventId); validateTablePara(dbName, tblName); ColumnStatistics colStatsCache = sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME, @@ -419,10 +420,11 @@ private void updateTableColStats(String dbName, String tblName, String[] colName dbName, tblName, Lists.newArrayList(colName[1]), validWriteIds, true); Assert.assertEquals(colStatsCache.getStatsObj().get(0).getColName(), colName[1]); verifyStatString(colStatsCache.getStatsObj().get(0), colName[1], avgColLen); + return lastEventId; } - private void updatePartColStats(String dbName, String tblName, boolean isTxnTable, String[] colName, - String partName, double highValue, double avgColLen) throws Throwable { + private long updatePartColStats(String dbName, String tblName, boolean isTxnTable, String[] colName, + String partName, double highValue, double avgColLen, long lastEventId) throws Throwable { long writeId = -1; String validWriteIds = null; List txnIds = null; @@ -467,7 +469,7 @@ private void updatePartColStats(String dbName, String tblName, boolean isTxnTabl } else { Assert.assertEquals(statRowStore.get(0).isIsStatsCompliant(), false); } - + lastEventId = CachedStore.updateUsingNotificationEvents(rawStore, lastEventId); List statSharedCache = sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, Collections.singletonList(partName), Collections.singletonList(colName[1]), validWriteIds, true); @@ -485,6 +487,8 @@ private void updatePartColStats(String dbName, String tblName, boolean isTxnTabl statPartCache = sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, CachedStore.partNameToVals(partName), colName[1], validWriteIds); verifyStatString(statPartCache.getColumnStatisticsObj(), colName[1], avgColLen); + + return lastEventId; } private List getStatsObjects(String dbName, String tblName, String[] colName, @@ -702,9 +706,10 @@ private void testTableColStatInternal(String dbName, String tblName, boolean isT String[] colName = new String[]{"income", "name"}; double highValue = 1200000.4525; double avgColLen = 50.30; + long lastEventId = 0; setUpBeforeTest(dbName, tblName, colName, isTxnTable); - updateTableColStats(dbName, tblName, colName, highValue, avgColLen, isTxnTable); + lastEventId = updateTableColStats(dbName, tblName, colName, highValue, avgColLen, isTxnTable, lastEventId); if (!isTxnTable) { deleteColStats(dbName, tblName, colName); } @@ -713,7 +718,7 @@ private void testTableColStatInternal(String dbName, String tblName, boolean isT createTableWithPart(dbName, tblName, colName, isTxnTable); List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1); String partName = partitions.get(0); - updatePartColStats(dbName, tblName, isTxnTable, colName, partName, highValue, avgColLen); + lastEventId = updatePartColStats(dbName, tblName, isTxnTable, colName, partName, highValue, avgColLen, lastEventId); if (!isTxnTable) { deletePartColStats(dbName, tblName, colName, partName); } @@ -745,9 +750,10 @@ public void testTableColumnStatisticsTxnTableMulti() throws Throwable { createTableWithPart(dbName, tblName, colName, true); List partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1); String partName = partitions.get(0); - updatePartColStats(dbName, tblName, true, colName, partName, highValue, avgColLen); - updatePartColStats(dbName, tblName, true, colName, partName, 1200000.4521, avgColLen); - updatePartColStats(dbName, tblName, true, colName, partName, highValue, 34.78); + long lastEventId = 0; + lastEventId = updatePartColStats(dbName, tblName, true, colName, partName, highValue, avgColLen, lastEventId); + lastEventId = updatePartColStats(dbName, tblName, true, colName, partName, 1200000.4521, avgColLen, lastEventId); + lastEventId = updatePartColStats(dbName, tblName, true, colName, partName, highValue, 34.78, lastEventId); } @Test @@ -757,6 +763,7 @@ public void testTableColumnStatisticsTxnTableMultiAbort() throws Throwable { String[] colName = new String[]{"income", "name"}; double highValue = 1200000.4525; double avgColLen = 50.30; + long lastEventId = 0; setUpBeforeTest(dbName, null, colName, true); createTableWithPart(dbName, tblName, colName, true); @@ -800,6 +807,7 @@ public void testTableColumnStatisticsTxnTableMultiAbort() throws Throwable { verifyStat(statRawStore.get(0).getStatsObj(), colName, highValue, avgColLen); Assert.assertEquals(statRawStore.get(0).isIsStatsCompliant(), false); + lastEventId = CachedStore.updateUsingNotificationEvents(rawStore, lastEventId); List statsListFromCache = sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, Collections.singletonList(partName), Collections.singletonList(colName[1]), validWriteIds, true); @@ -820,6 +828,7 @@ public void testTableColumnStatisticsTxnTableOpenTxn() throws Throwable { String[] colName = new String[]{"income", "name"}; double highValue = 1200000.4121; double avgColLen = 23.30; + long lastEventId = 0; setUpBeforeTest(dbName, null, colName, true); createTableWithPart(dbName, tblName, colName, true); @@ -827,7 +836,7 @@ public void testTableColumnStatisticsTxnTableOpenTxn() throws Throwable { String partName = partitions.get(0); // update part col stats successfully. - updatePartColStats(dbName, tblName, true, colName, partName, 1.2, 12.2); + lastEventId = updatePartColStats(dbName, tblName, true, colName, partName, 1.2, 12.2, lastEventId); List txnIds = allocateTxns(1); long writeId = allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId(); @@ -850,6 +859,7 @@ public void testTableColumnStatisticsTxnTableOpenTxn() throws Throwable { // write stats objs persistently hmsHandler.update_partition_column_statistics_req(setTblColStat); + lastEventId = CachedStore.updateUsingNotificationEvents(rawStore, lastEventId); // keep the txn open and verify that the stats got is not compliant. @@ -900,9 +910,9 @@ private void verifyAggrStat(String dbName, String tblName, String[] colName, Lis Assert.assertEquals(aggrStatsCached, aggrStats); //Assert.assertEquals(aggrStatsCached.isIsStatsCompliant(), true); - List stats = sharedCache.getAggrStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, - Collections.singletonList(colName[0]), SharedCache.StatsType.ALL); - Assert.assertEquals(stats.get(0).getStatsData().getDoubleStats().getHighValue(), highValue, 0.01); + MergedColumnStatsForPartitions stats = CachedStore.mergeColStatsForPartitions(DEFAULT_CATALOG_NAME, dbName, tblName, Lists.newArrayList("income=1", "income=2"), + Collections.singletonList(colName[0]), sharedCache, SharedCache.StatsType.ALL, validWriteIds, false, 0.0); + Assert.assertEquals(stats.colStats.get(0).getStatsData().getDoubleStats().getHighValue(), highValue, 0.01); } @Test @@ -917,11 +927,13 @@ public void testAggrStat() throws Throwable { String partName = partitions.get(0); // update part col stats successfully. - updatePartColStats(dbName, tblName, false, colName, partitions.get(0), 2, 12); - updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 4, 10); + long lastEventId = 0; + lastEventId = updatePartColStats(dbName, tblName, false, colName, partitions.get(0), 2, 12, lastEventId); + lastEventId = updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 4, 10, lastEventId); + lastEventId = CachedStore.updateUsingNotificationEvents(rawStore, lastEventId); verifyAggrStat(dbName, tblName, colName, partitions, false, 4); - updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 3, 10); + lastEventId = updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 3, 10, lastEventId); verifyAggrStat(dbName, tblName, colName, partitions, false, 3); } @@ -930,6 +942,7 @@ public void testAggrStatTxnTable() throws Throwable { String dbName = "aggr_stats_test_db_txn"; String tblName = "tbl_part"; String[] colName = new String[]{"income", "name"}; + long lastEventId = 0; setUpBeforeTest(dbName, null, colName, true); createTableWithPart(dbName, tblName, colName, true); @@ -937,11 +950,11 @@ public void testAggrStatTxnTable() throws Throwable { String partName = partitions.get(0); // update part col stats successfully. - updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12); - updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10); + lastEventId = updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12, lastEventId); + lastEventId = updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10, lastEventId); verifyAggrStat(dbName, tblName, colName, partitions, true, 4); - updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 3, 10); + lastEventId = updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 3, 10, lastEventId); verifyAggrStat(dbName, tblName, colName, partitions, true, 3); List txnIds = allocateTxns(1); @@ -984,6 +997,7 @@ public void testAggrStatAbortTxn() throws Throwable { String dbName = "aggr_stats_test_db_txn_abort"; String tblName = "tbl_part"; String[] colName = new String[]{"income", "name"}; + long lastEventId = 0; setUpBeforeTest(dbName, null, colName, true); createTableWithPart(dbName, tblName, colName, true); @@ -991,8 +1005,8 @@ public void testAggrStatAbortTxn() throws Throwable { String partName = partitions.get(0); // update part col stats successfully. - updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12); - updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10); + lastEventId = updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12, lastEventId); + lastEventId = updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10, lastEventId); verifyAggrStat(dbName, tblName, colName, partitions, true, 4); List txnIds = allocateTxns(4); diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index d2c2ccd..b60d5aa 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -157,6 +157,7 @@ public void initConf() throws Exception { } // Plug verifying metastore in for testing DirectSQL. + conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.cache.CachedStore"); conf.setVar(ConfVars.METASTORE_RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.VerifyingObjectStore"); miniClusters.initConf(conf); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index bac9d01..083c0d4 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -349,9 +349,6 @@ public static ConfVars getMetaConf(String name) { CATALOG_DEFAULT("metastore.catalog.default", "metastore.catalog.default", "hive", "The default catalog to use when a catalog is not specified. Default is 'hive' (the " + "default catalog)."), - CATALOGS_TO_CACHE("metastore.cached.rawstore.catalogs", "metastore.cached.rawstore.catalogs", - "hive", "Comma separated list of catalogs to cache in the CachedStore. Default is 'hive' " + - "(the default catalog). Empty string means all catalogs will be cached."), CLIENT_CONNECT_RETRY_DELAY("metastore.client.connect.retry.delay", "hive.metastore.client.connect.retry.delay", 1, TimeUnit.SECONDS, "Number of seconds for the client to wait between consecutive connection attempts"), @@ -975,8 +972,6 @@ public static ConfVars getMetaConf(String name) { "Time interval describing how often the reaper runs"), TOKEN_SIGNATURE("metastore.token.signature", "hive.metastore.token.signature", "", "The delegation token service name to match when selecting a token from the current user's tokens."), - METASTORE_CACHE_CAN_USE_EVENT("metastore.cache.can.use.event", "hive.metastore.cache.can.use.event", false, - "Can notification events from notification log table be used for updating the metastore cache."), TRANSACTIONAL_EVENT_LISTENERS("metastore.transactional.event.listeners", "hive.metastore.transactional.event.listeners", "", "A comma separated list of Java classes that implement the org.apache.riven.MetaStoreEventListener" + diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 1a694fb..37d339a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -583,19 +583,6 @@ public void init() throws MetaException { listeners.add(new HMSMetricsListener(conf)); } - boolean canCachedStoreCanUseEvent = false; - for (MetaStoreEventListener listener : transactionalListeners) { - if (listener.doesAddEventsToNotificationLogTable()) { - canCachedStoreCanUseEvent = true; - break; - } - } - if (conf.getBoolean(ConfVars.METASTORE_CACHE_CAN_USE_EVENT.getVarname(), false) && - !canCachedStoreCanUseEvent) { - throw new MetaException("CahcedStore can not use events for invalidation as there is no " + - " TransactionalMetaStoreEventListener to add events to notification table"); - } - endFunctionListeners = MetaStoreServerUtils.getMetaStoreListeners( MetaStoreEndFunctionListener.class, conf, MetastoreConf.getVar(conf, ConfVars.END_FUNCTION_LISTENERS)); @@ -6090,13 +6077,13 @@ private boolean updateTableColumnStatsInternal(ColumnStatistics colStats, MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.UPDATE_TABLE_COLUMN_STAT, new UpdateTableColumnStatEvent(colStats, tableObj, parameters, - writeId, this)); + writeId, validWriteIds, this)); } if (!listeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(listeners, EventType.UPDATE_TABLE_COLUMN_STAT, new UpdateTableColumnStatEvent(colStats, tableObj, parameters, - writeId,this)); + writeId, validWriteIds, this)); } } committed = getMS().commitTransaction(); @@ -6156,13 +6143,13 @@ private boolean updatePartitonColStatsInternal(Table tbl, ColumnStatistics colSt MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.UPDATE_PARTITION_COLUMN_STAT, new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, tbl, - writeId, this)); + writeId, validWriteIds, this)); } if (!listeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(listeners, EventType.UPDATE_PARTITION_COLUMN_STAT, new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, tbl, - writeId, this)); + writeId, validWriteIds, this)); } } committed = getMS().commitTransaction(); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index a5d0c04..2a33b76 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -121,13 +121,18 @@ private static boolean areTxnStatsSupported; private PartitionExpressionProxy expressionProxy = null; private static SharedCache sharedCache = new SharedCache(); - private static boolean canUseEvents = false; private static long lastEventId; + private static boolean catchingup = false; static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName()); @Override public void setConf(Configuration conf) { + if (conf.get(ConfVars.TRANSACTIONAL_EVENT_LISTENERS.getVarname())==null || + conf.get(ConfVars.TRANSACTIONAL_EVENT_LISTENERS.getVarname()).isEmpty()) { + throw new RuntimeException("CahcedStore can not use events for invalidation as there is no " + + " TransactionalMetaStoreEventListener to add events to notification table"); + } setConfInternal(conf); initBlackListWhiteList(conf); initSharedCache(conf); @@ -145,6 +150,12 @@ void setConfForTest(Configuration conf) { initSharedCache(conf); } + private static void triggerUpdateUsingEventAdhoc(RawStore rawStore) { + catchingup = true; + triggerUpdateUsingEvent(rawStore); + catchingup = false; + } + synchronized private static void triggerUpdateUsingEvent(RawStore rawStore) { if (!isCachePrewarmed.get()) { LOG.error("cache update should be done only after prewarm"); @@ -170,13 +181,6 @@ synchronized private static void triggerPreWarm(RawStore rawStore) { } private void setConfInternal(Configuration conf) { - if (MetastoreConf.getBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT)) { - canUseEvents = true; - } else { - canUseEvents = false; - } - LOG.info("canUseEvents is set to " + canUseEvents + " in cached Store"); - String rawStoreClassName = MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName()); if (rawStore == null) { @@ -252,6 +256,7 @@ static private void updateStatsForAlterTable(RawStore rawStore, Table tblBefore, @VisibleForTesting public static long updateUsingNotificationEvents(RawStore rawStore, long lastEventId) throws Exception { + Deadline.registerIfNot(1000000); LOG.debug("updating cache using notification events starting from event id " + lastEventId); NotificationEventRequest rqst = new NotificationEventRequest(lastEventId); @@ -375,6 +380,32 @@ public static long updateUsingNotificationEvents(RawStore rawStore, long lastEve break; case MessageBuilder.UPDATE_TBL_COL_STAT_EVENT: UpdateTableColumnStatMessage msg = deserializer.getUpdateTableColumnStatMessage(message); + Table tbl = msg.getTableObject(); + Map newParams = new HashMap<>(tbl.getParameters()); + List colNames = new ArrayList<>(); + for (ColumnStatisticsObj statsObj : msg.getColumnStatistics().getStatsObj()) { + colNames.add(statsObj.getColName()); + } + StatsSetupConst.setColumnStatsState(newParams, colNames); + long writeId = msg.getWriteId(); + String validWriteIds = msg.getWriteIds(); + if (validWriteIds != null) { + if (!areTxnStatsSupported) { + StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); + } else { + String errorMsg = ObjectStore.verifyStatsChangeCtx(TableName.getDbTable(dbName, tableName), + tbl.getParameters(), newParams, writeId, validWriteIds, true); + if (errorMsg != null) { + throw new MetaException(errorMsg); + } + if (!ObjectStore.isCurrentStatsValidForTheQuery(newParams, writeId, validWriteIds, true)) { + // Make sure we set the flag to invalid regardless of the current value. + StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); + LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table " + + dbName + "." + tableName); + } + } + } sharedCache.alterTableAndStatsInCache(catalogName, dbName, tableName, msg.getWriteId(), msg.getColumnStatistics().getStatsObj(), msg.getParameters()); break; @@ -384,6 +415,32 @@ public static long updateUsingNotificationEvents(RawStore rawStore, long lastEve break; case MessageBuilder.UPDATE_PART_COL_STAT_EVENT: UpdatePartitionColumnStatMessage msgPartUpdate = deserializer.getUpdatePartitionColumnStatMessage(message); + Partition partition = sharedCache.getPartitionFromCache(catalogName, dbName, tableName, msgPartUpdate.getPartVals()); + newParams = new HashMap<>(partition.getParameters()); + colNames = new ArrayList<>(); + for (ColumnStatisticsObj statsObj : msgPartUpdate.getColumnStatistics().getStatsObj()) { + colNames.add(statsObj.getColName()); + } + StatsSetupConst.setColumnStatsState(newParams, colNames); + writeId = msgPartUpdate.getWriteId(); + validWriteIds = msgPartUpdate.getWriteIds(); + if (validWriteIds != null) { + if (!areTxnStatsSupported) { + StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); + } else { + String errorMsg = ObjectStore.verifyStatsChangeCtx(TableName.getDbTable(dbName, tableName), + partition.getParameters(), newParams, writeId, validWriteIds, true); + if (errorMsg != null) { + throw new MetaException(errorMsg); + } + if (!ObjectStore.isCurrentStatsValidForTheQuery(newParams, writeId, validWriteIds, true)) { + // Make sure we set the flag to invalid regardless of the current value. + StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); + LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the partition " + + dbName + "." + tableName + "." + msgPartUpdate.getPartVals()); + } + } + } sharedCache.alterPartitionAndStatsInCache(catalogName, dbName, tableName, msgPartUpdate.getWriteId(), msgPartUpdate.getPartVals(), msgPartUpdate.getParameters(), msgPartUpdate.getColumnStatistics().getStatsObj()); @@ -417,15 +474,9 @@ static void prewarm(RawStore rawStore) { Deadline.registerIfNot(1000000); Collection catalogsToCache; try { - catalogsToCache = catalogsToCache(rawStore); - LOG.info("Going to cache catalogs: " + org.apache.commons.lang.StringUtils.join(catalogsToCache, ", ")); - List catalogs = new ArrayList<>(catalogsToCache.size()); - for (String catName : catalogsToCache) { - catalogs.add(rawStore.getCatalog(catName)); - } - sharedCache.populateCatalogsInCache(catalogs); - } catch (MetaException | NoSuchObjectException e) { - LOG.warn("Failed to populate catalogs in cache, going to try again", e); + catalogsToCache = rawStore.getCatalogs(); + } catch (MetaException e) { + LOG.warn("Failed to get catalogs, going to try again", e); try { Thread.sleep(sleepTime); sleepTime = sleepTime * 2; @@ -435,7 +486,7 @@ static void prewarm(RawStore rawStore) { // try again continue; } - LOG.info("Finished prewarming catalogs, starting on databases"); + LOG.info("Finished getting catalogs, starting cache databases"); List databases = new ArrayList<>(); for (String catName : catalogsToCache) { try { @@ -617,17 +668,6 @@ private static void initBlackListWhiteList(Configuration conf) { MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST)); } - private static Collection catalogsToCache(RawStore rs) throws MetaException { - Collection confValue = - MetastoreConf.getStringCollection(rs.getConf(), ConfVars.CATALOGS_TO_CACHE); - if (confValue == null || confValue.isEmpty() || - (confValue.size() == 1 && confValue.contains(""))) { - return rs.getCatalogs(); - } else { - return confValue; - } - } - @VisibleForTesting /** * This starts a background thread, which initially populates the SharedCache and later @@ -711,20 +751,12 @@ static void setCacheRefreshPeriod(long time) { @Override public void run() { if (!shouldRunPrewarm) { - if (canUseEvents) { - try { - triggerUpdateUsingEvent(rawStore); - } catch (Exception e) { - LOG.error("failed to update cache using events ", e); - } - } else { - // TODO: prewarm and update can probably be merged. - try { - update(); - } catch (Exception e) { - LOG.error("periodical refresh fail ", e); - } + try { + triggerUpdateUsingEvent(rawStore); + } catch (Exception e) { + LOG.error("failed to update cache using events ", e); } + sharedCache.incrementUpdateCount(); } else { try { triggerPreWarm(rawStore); @@ -736,222 +768,6 @@ public void run() { } } - void update() { - Deadline.registerIfNot(1000000); - LOG.debug("CachedStore: updating cached objects. Shared cache has been update {} times so far.", - sharedCache.getUpdateCount()); - try { - for (String catName : catalogsToCache(rawStore)) { - List dbNames = rawStore.getAllDatabases(catName); - // Update the database in cache - updateDatabases(rawStore, catName, dbNames); - for (String dbName : dbNames) { - // Update the tables in cache - updateTables(rawStore, catName, dbName); - List tblNames; - try { - tblNames = rawStore.getAllTables(catName, dbName); - } catch (MetaException e) { - LOG.debug(ExceptionUtils.getStackTrace(e)); - // Continue with next database - continue; - } - for (String tblName : tblNames) { - if (!shouldCacheTable(catName, dbName, tblName)) { - continue; - } - // Update the table column stats for a table in cache - updateTableColStats(rawStore, catName, dbName, tblName); - // Update the partitions for a table in cache - updateTablePartitions(rawStore, catName, dbName, tblName); - // Update the partition col stats for a table in cache - updateTablePartitionColStats(rawStore, catName, dbName, tblName); - // Update aggregate partition column stats for a table in cache - updateTableAggregatePartitionColStats(rawStore, catName, dbName, tblName); - } - } - } - sharedCache.incrementUpdateCount(); - LOG.debug("CachedStore: updated cached objects. Shared cache update count is: {}", - sharedCache.getUpdateCount()); - } catch (MetaException e) { - LOG.error("Updating CachedStore: error happen when refresh; skipping this iteration", e); - } - } - - private void updateDatabases(RawStore rawStore, String catName, List dbNames) { - LOG.debug("CachedStore: updating cached database objects for catalog: {}", catName); - boolean success = false; - // Try MAX_RETRIES times, then move to next method - int maxTries = MAX_RETRIES; - while (!success && (maxTries-- > 0)) { - // Prepare the list of databases - List databases = new ArrayList<>(); - for (String dbName : dbNames) { - Database db; - try { - db = rawStore.getDatabase(catName, dbName); - databases.add(db); - } catch (NoSuchObjectException e) { - LOG.info("Updating CachedStore: database: " + catName + "." + dbName + " does not exist.", e); - } - } - success = sharedCache.refreshDatabasesInCache(databases); - LOG.debug("CachedStore: updated cached database objects for catalog: {}", catName); - } - } - - private void updateTables(RawStore rawStore, String catName, String dbName) { - LOG.debug("CachedStore: updating cached table objects for catalog: {}, database: {}", catName, dbName); - boolean success = false; - // Try MAX_RETRIES times, then move to next method - int maxTries = MAX_RETRIES; - while (!success && (maxTries-- > 0)) { - List tables = new ArrayList<>(); - try { - List tblNames = rawStore.getAllTables(catName, dbName); - for (String tblName : tblNames) { - if (!shouldCacheTable(catName, dbName, tblName)) { - continue; - } - Table table = rawStore.getTable(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName)); - tables.add(table); - } - success = sharedCache.refreshTablesInCache(catName, dbName, tables); - LOG.debug("CachedStore: updated cached table objects for catalog: {}, database: {}", catName, dbName); - } catch (MetaException e) { - LOG.debug("Unable to refresh cached tables for database: " + dbName, e); - } - } - } - - private void updateTableColStats(RawStore rawStore, String catName, String dbName, String tblName) { - LOG.debug("CachedStore: updating cached table col stats objects for catalog: {}, database: {}", catName, dbName); - boolean committed = false; - rawStore.openTransaction(); - try { - Table table = rawStore.getTable(catName, dbName, tblName); - if (table != null && !table.isSetPartitionKeys()) { - List colNames = MetaStoreUtils.getColumnNamesForTable(table); - Deadline.startTimer("getTableColumnStatistics"); - ColumnStatistics tableColStats = rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames); - Deadline.stopTimer(); - if (tableColStats != null) { - sharedCache.refreshTableColStatsInCache(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), - tableColStats.getStatsObj()); - // Update the table to get consistent stats state. - sharedCache.alterTableInCache(catName, dbName, tblName, table); - } - } - committed = rawStore.commitTransaction(); - LOG.debug("CachedStore: updated cached table col stats objects for catalog: {}, database: {}", catName, dbName); - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Unable to refresh table column stats for table: " + tblName, e); - } finally { - if (!committed) { - sharedCache.removeAllTableColStatsFromCache(catName, dbName, tblName); - rawStore.rollbackTransaction(); - } - } - } - - private void updateTablePartitions(RawStore rawStore, String catName, String dbName, String tblName) { - LOG.debug("CachedStore: updating cached partition objects for catalog: {}, database: {}, table: {}", catName, - dbName, tblName); - try { - Deadline.startTimer("getPartitions"); - List partitions = rawStore.getPartitions(catName, dbName, tblName, Integer.MAX_VALUE); - Deadline.stopTimer(); - sharedCache.refreshPartitionsInCache(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), partitions); - LOG.debug("CachedStore: updated cached partition objects for catalog: {}, database: {}, table: {}", catName, - dbName, tblName); - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); - } - } - - private void updateTablePartitionColStats(RawStore rawStore, String catName, String dbName, String tblName) { - LOG.debug("CachedStore: updating cached partition col stats objects for catalog: {}, database: {}, table: {}", - catName, dbName, tblName); - boolean committed = false; - rawStore.openTransaction(); - try { - Table table = rawStore.getTable(catName, dbName, tblName); - if (table != null) { - List colNames = MetaStoreUtils.getColumnNamesForTable(table); - List partNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1); - // Get partition column stats for this table - Deadline.startTimer("getPartitionColumnStatistics"); - List partitionColStats = - rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames); - Deadline.stopTimer(); - sharedCache.refreshPartitionColStatsInCache(catName, dbName, tblName, partitionColStats); - Deadline.startTimer("getPartitionsByNames"); - List parts = rawStore.getPartitionsByNames(catName, dbName, tblName, partNames); - Deadline.stopTimer(); - // Also save partitions for consistency as they have the stats state. - for (Partition part : parts) { - sharedCache.alterPartitionInCache(catName, dbName, tblName, part.getValues(), part); - } - } - committed = rawStore.commitTransaction(); - LOG.debug("CachedStore: updated cached partition col stats objects for catalog: {}, database: {}, table: {}", - catName, dbName, tblName); - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); - } finally { - if (!committed) { - sharedCache.removeAllPartitionColStatsFromCache(catName, dbName, tblName); - rawStore.rollbackTransaction(); - } - } - } - - // Update cached aggregate stats for all partitions of a table and for all - // but default partition - private static void updateTableAggregatePartitionColStats(RawStore rawStore, String catName, String dbName, - String tblName) { - LOG.debug("CachedStore: updating cached aggregate partition col stats objects for catalog: {}, database: {}, table: {}", - catName, dbName, tblName); - try { - Table table = rawStore.getTable(catName, dbName, tblName); - if (table == null) { - return; - } - List partNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1); - List colNames = MetaStoreUtils.getColumnNamesForTable(table); - if ((partNames != null) && (partNames.size() > 0)) { - Deadline.startTimer("getAggregareStatsForAllPartitions"); - AggrStats aggrStatsAllPartitions = rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); - Deadline.stopTimer(); - // Remove default partition from partition names and get aggregate stats again - List partKeys = table.getPartitionKeys(); - String defaultPartitionValue = MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); - List partCols = new ArrayList(); - List partVals = new ArrayList(); - for (FieldSchema fs : partKeys) { - partCols.add(fs.getName()); - partVals.add(defaultPartitionValue); - } - String defaultPartitionName = FileUtils.makePartName(partCols, partVals); - partNames.remove(defaultPartitionName); - Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault"); - AggrStats aggrStatsAllButDefaultPartition = - rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); - Deadline.stopTimer(); - sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions, - aggrStatsAllButDefaultPartition, null); - LOG.debug("CachedStore: updated cached aggregate partition col stats objects for catalog: {}, database: {}, table: {}", - catName, dbName, tblName); - } - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName, e); - } - } } @Override @@ -971,27 +787,7 @@ public boolean openTransaction() { @Override public boolean commitTransaction() { - if (!rawStore.commitTransaction()) { - return false; - } - - // In case of event based update, shared cache is not updated directly to avoid inconsistency. - // For example, if metastore B add a partition, then metastore A drop a partition later. However, on metastore A, - // it first get drop partition request, then from notification, create the partition. If there's no tombstone - // entry in partition cache to tell drop is after creation, we end up consumes the creation request. Though - // eventually there's drop partition notification, but during the interim, later event takes precedence. - // So we will not update the cache during raw store operation but wait during commit transaction to make sure that - // the event related to the current transactions are updated in the cache and thus we can support strong - // consistency in case there is only one metastore. - if (canUseEvents) { - try { - triggerUpdateUsingEvent(rawStore); - } catch (Exception e) { - //TODO : Not sure how to handle it as the commit is already done in the object store. - LOG.error("Failed to update cache", e); - } - } - return true; + return rawStore.commitTransaction(); } @Override @@ -1007,70 +803,40 @@ public void rollbackTransaction() { @Override public void createCatalog(Catalog cat) throws MetaException { rawStore.createCatalog(cat); - // in case of event based cache update, cache will not be updated for catalog. - if (!canUseEvents) { - sharedCache.addCatalogToCache(cat); - } } @Override public void alterCatalog(String catName, Catalog cat) throws MetaException, InvalidOperationException { rawStore.alterCatalog(catName, cat); - // in case of event based cache update, cache will not be updated for catalog. - if (!canUseEvents) { - sharedCache.alterCatalogInCache(StringUtils.normalizeIdentifier(catName), cat); - } } @Override public Catalog getCatalog(String catalogName) throws NoSuchObjectException, MetaException { - // in case of event based cache update, cache will not be updated for catalog. - if (!sharedCache.isCatalogCachePrewarmed() || canUseEvents) { - return rawStore.getCatalog(catalogName); - } - Catalog cat = sharedCache.getCatalogFromCache(normalizeIdentifier(catalogName)); - if (cat == null) { - throw new NoSuchObjectException(); - } - return cat; + return rawStore.getCatalog(catalogName); } @Override public List getCatalogs() throws MetaException { - // in case of event based cache update, cache will not be updated for catalog. - if (!sharedCache.isCatalogCachePrewarmed() || canUseEvents) { - return rawStore.getCatalogs(); - } - return sharedCache.listCachedCatalogs(); + return rawStore.getCatalogs(); } @Override public void dropCatalog(String catalogName) throws NoSuchObjectException, MetaException { rawStore.dropCatalog(catalogName); - - // in case of event based cache update, cache will not be updated for catalog. - if (!canUseEvents) { - catalogName = catalogName.toLowerCase(); - sharedCache.removeCatalogFromCache(catalogName); - } } @Override public void createDatabase(Database db) throws InvalidObjectException, MetaException { rawStore.createDatabase(db); - // in case of event based cache update, cache will be updated during commit. - if (!canUseEvents) { - sharedCache.addDatabaseToCache(db); - } + // Enforce monotonic reads for db + triggerUpdateUsingEventAdhoc(getRawStore()); + sharedCache.addDatabaseToCache(db); } @Override public Database getDatabase(String catName, String dbName) throws NoSuchObjectException { - // in case of event based cache update, cache will be updated during commit. So within active transaction, read - // directly from rawStore to avoid reading stale data as the data updated during same transaction will not be - // updated in the cache. - if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) { + if (!sharedCache.isDatabaseCachePrewarmed() || catchingup) { return rawStore.getDatabase(catName, dbName); } dbName = dbName.toLowerCase(); @@ -1085,8 +851,9 @@ public Database getDatabase(String catName, String dbName) throws NoSuchObjectEx @Override public boolean dropDatabase(String catName, String dbName) throws NoSuchObjectException, MetaException { boolean succ = rawStore.dropDatabase(catName, dbName); - if (succ && !canUseEvents) { - // in case of event based cache update, cache will be updated during commit. + if (succ) { + // Enforce monotonic reads for db + triggerUpdateUsingEventAdhoc(getRawStore()); sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName)); } @@ -1097,8 +864,9 @@ public boolean dropDatabase(String catName, String dbName) throws NoSuchObjectEx public boolean alterDatabase(String catName, String dbName, Database db) throws NoSuchObjectException, MetaException { boolean succ = rawStore.alterDatabase(catName, dbName, db); - if (succ && !canUseEvents) { - // in case of event based cache update, cache will be updated during commit. + if (succ) { + // Enforce monotonic reads for db + triggerUpdateUsingEventAdhoc(getRawStore()); sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName), db); } @@ -1107,7 +875,7 @@ public boolean alterDatabase(String catName, String dbName, Database db) @Override public List getDatabases(String catName, String pattern) throws MetaException { - if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) { + if (!sharedCache.isDatabaseCachePrewarmed() || catchingup) { return rawStore.getDatabases(catName, pattern); } return sharedCache.listCachedDatabases(catName, pattern); @@ -1115,7 +883,7 @@ public boolean alterDatabase(String catName, String dbName, Database db) @Override public List getAllDatabases(String catName) throws MetaException { - if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) { + if (!sharedCache.isDatabaseCachePrewarmed() || catchingup) { return rawStore.getAllDatabases(catName); } return sharedCache.listCachedDatabases(catName); @@ -1157,33 +925,37 @@ private void validateTableType(Table tbl) { @Override public void createTable(Table tbl) throws InvalidObjectException, MetaException { rawStore.createTable(tbl); - // in case of event based cache update, cache will be updated during commit. - if (canUseEvents) { - return; - } String catName = normalizeIdentifier(tbl.getCatName()); String dbName = normalizeIdentifier(tbl.getDbName()); String tblName = normalizeIdentifier(tbl.getTableName()); if (!shouldCacheTable(catName, dbName, tblName)) { return; } - validateTableType(tbl); - sharedCache.addTableToCache(catName, dbName, tblName, tbl); + if (!isTransactionalTable(tbl)) { + // Enforce monotonic reads for external table + triggerUpdateUsingEventAdhoc(getRawStore()); + validateTableType(tbl); + sharedCache.addTableToCache(catName, dbName, tblName, tbl); + } } @Override public boolean dropTable(String catName, String dbName, String tblName) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.dropTable(catName, dbName, tblName); - // in case of event based cache update, cache will be updated during commit. - if (succ && !canUseEvents) { + if (succ) { catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); if (!shouldCacheTable(catName, dbName, tblName)) { return succ; } - sharedCache.removeTableFromCache(catName, dbName, tblName); + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl!=null && !isTransactionalTable(tbl)) { + // Enforce monotonic reads for external table + triggerUpdateUsingEventAdhoc(getRawStore()); + sharedCache.removeTableFromCache(catName, dbName, tblName); + } } return succ; } @@ -1198,7 +970,7 @@ public Table getTable(String catName, String dbName, String tblName, String vali catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName) || catchingup) { return rawStore.getTable(catName, dbName, tblName, validWriteIds); } Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1240,15 +1012,19 @@ public Table getTable(String catName, String dbName, String tblName, String vali @Override public boolean addPartition(Partition part) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartition(part); - // in case of event based cache update, cache will be updated during commit. - if (succ && !canUseEvents) { + if (succ) { String dbName = normalizeIdentifier(part.getDbName()); String tblName = normalizeIdentifier(part.getTableName()); String catName = part.isSetCatName() ? normalizeIdentifier(part.getCatName()) : DEFAULT_CATALOG_NAME; if (!shouldCacheTable(catName, dbName, tblName)) { return succ; } - sharedCache.addPartitionToCache(catName, dbName, tblName, part); + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl!=null && !isTransactionalTable(tbl)) { + // Enforce monotonic reads for external table + triggerUpdateUsingEventAdhoc(getRawStore()); + sharedCache.addPartitionToCache(catName, dbName, tblName, part); + } } return succ; } @@ -1257,15 +1033,19 @@ public boolean addPartition(Partition part) throws InvalidObjectException, MetaE public boolean addPartitions(String catName, String dbName, String tblName, List parts) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(catName, dbName, tblName, parts); - // in case of event based cache update, cache will be updated during commit. - if (succ && !canUseEvents) { + if (succ) { catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); if (!shouldCacheTable(catName, dbName, tblName)) { return succ; } - sharedCache.addPartitionsToCache(catName, dbName, tblName, parts); + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl!=null && !isTransactionalTable(tbl)) { + // Enforce monotonic reads for external table + triggerUpdateUsingEventAdhoc(getRawStore()); + sharedCache.addPartitionsToCache(catName, dbName, tblName, parts); + } } return succ; } @@ -1274,18 +1054,22 @@ public boolean addPartitions(String catName, String dbName, String tblName, List public boolean addPartitions(String catName, String dbName, String tblName, PartitionSpecProxy partitionSpec, boolean ifNotExists) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(catName, dbName, tblName, partitionSpec, ifNotExists); - // in case of event based cache update, cache will be updated during commit. - if (succ && !canUseEvents) { + if (succ) { catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); if (!shouldCacheTable(catName, dbName, tblName)) { return succ; } - PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); - while (iterator.hasNext()) { - Partition part = iterator.next(); - sharedCache.addPartitionToCache(catName, dbName, tblName, part); + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl!=null || !isTransactionalTable(tbl)) { + // Enforce monotonic reads for external table + triggerUpdateUsingEventAdhoc(getRawStore()); + PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); + while (iterator.hasNext()) { + Partition part = iterator.next(); + sharedCache.addPartitionToCache(catName, dbName, tblName, part); + } } } return succ; @@ -1304,7 +1088,7 @@ public Partition getPartition(String catName, String dbName, String tblName, catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName) || catchingup) { return rawStore.getPartition( catName, dbName, tblName, part_vals, validWriteIds); } @@ -1335,7 +1119,7 @@ public boolean doesPartitionExist(String catName, String dbName, String tblName, catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName) || catchingup) { return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, part_vals); } Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1350,15 +1134,19 @@ public boolean doesPartitionExist(String catName, String dbName, String tblName, public boolean dropPartition(String catName, String dbName, String tblName, List part_vals) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.dropPartition(catName, dbName, tblName, part_vals); - // in case of event based cache update, cache will be updated during commit. - if (succ && !canUseEvents) { + if (succ) { catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); if (!shouldCacheTable(catName, dbName, tblName)) { return succ; } - sharedCache.removePartitionFromCache(catName, dbName, tblName, part_vals); + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl!=null && !isTransactionalTable(tbl)) { + // Enforce monotonic reads for external table + triggerUpdateUsingEventAdhoc(getRawStore()); + sharedCache.removePartitionFromCache(catName, dbName, tblName, part_vals); + } } return succ; } @@ -1367,21 +1155,22 @@ public boolean dropPartition(String catName, String dbName, String tblName, List public void dropPartitions(String catName, String dbName, String tblName, List partNames) throws MetaException, NoSuchObjectException { rawStore.dropPartitions(catName, dbName, tblName, partNames); - // in case of event based cache update, cache will be updated during commit. - if (canUseEvents) { - return; - } catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); if (!shouldCacheTable(catName, dbName, tblName)) { return; } - List> partVals = new ArrayList<>(); - for (String partName : partNames) { - partVals.add(partNameToVals(partName)); + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl!=null && !isTransactionalTable(tbl)) { + // Enforce monotonic reads for external table + triggerUpdateUsingEventAdhoc(getRawStore()); + List> partVals = new ArrayList<>(); + for (String partName : partNames) { + partVals.add(partNameToVals(partName)); + } + sharedCache.removePartitionsFromCache(catName, dbName, tblName, partVals); } - sharedCache.removePartitionsFromCache(catName, dbName, tblName, partVals); } @Override @@ -1390,7 +1179,7 @@ public void dropPartitions(String catName, String dbName, String tblName, List getTables(String catName, String dbName, String pattern) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() || - (canUseEvents && rawStore.isActiveTransaction())) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get()) { return rawStore.getTables(catName, dbName, pattern); } return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), @@ -1461,8 +1249,7 @@ public void updateCreationMetadata(String catName, String dbname, String tablena @Override public List getTables(String catName, String dbName, String pattern, TableType tableType) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()|| !isCachedAllMetadata.get() - || (canUseEvents && rawStore.isActiveTransaction())) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()|| !isCachedAllMetadata.get() || catchingup) { return rawStore.getTables(catName, dbName, pattern, tableType); } return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), @@ -1485,8 +1272,7 @@ public void updateCreationMetadata(String catName, String dbname, String tablena public List getTableMeta(String catName, String dbNames, String tableNames, List tableTypes) throws MetaException { // TODO Check if all required tables are allowed, if so, get it from cache - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() || - (canUseEvents && rawStore.isActiveTransaction())) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() || catchingup) { return rawStore.getTableMeta(catName, dbNames, tableNames, tableTypes); } return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(catName), @@ -1497,15 +1283,12 @@ public void updateCreationMetadata(String catName, String dbname, String tablena @Override public List
getTableObjectsByName(String catName, String dbName, List tblNames) throws MetaException, UnknownDBException { - if (canUseEvents && rawStore.isActiveTransaction()) { - return rawStore.getTableObjectsByName(catName, dbName, tblNames); - } dbName = normalizeIdentifier(dbName); catName = normalizeIdentifier(catName); boolean missSomeInCache = false; for (String tblName : tblNames) { tblName = normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { + if (!shouldCacheTable(catName, dbName, tblName) || catchingup) { missSomeInCache = true; break; } @@ -1527,15 +1310,13 @@ public void updateCreationMetadata(String catName, String dbname, String tablena if (tbl != null) { tables.add(tbl); } - tables.add(tbl); } return tables; } @Override public List getAllTables(String catName, String dbName) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() || - (canUseEvents && rawStore.isActiveTransaction())) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() || catchingup) { return rawStore.getAllTables(catName, dbName); } return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), @@ -1555,7 +1336,7 @@ public void updateCreationMetadata(String catName, String dbname, String tablena catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName) || catchingup) { return rawStore.listPartitionNames(catName, dbName, tblName, max_parts); } Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1585,17 +1366,18 @@ public Partition alterPartition(String catName, String dbName, String tblName, List partVals, Partition newPart, String validWriteIds) throws InvalidObjectException, MetaException { newPart = rawStore.alterPartition(catName, dbName, tblName, partVals, newPart, validWriteIds); - // in case of event based cache update, cache will be updated during commit. - if (canUseEvents) { - return newPart; - } catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); if (!shouldCacheTable(catName, dbName, tblName)) { return newPart; } - sharedCache.alterPartitionInCache(catName, dbName, tblName, partVals, newPart); + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl!=null && !isTransactionalTable(tbl)) { + // Enforce monotonic reads for external table + triggerUpdateUsingEventAdhoc(getRawStore()); + sharedCache.alterPartitionInCache(catName, dbName, tblName, partVals, newPart); + } return newPart; } @@ -1606,17 +1388,18 @@ public Partition alterPartition(String catName, String dbName, String tblName, throws InvalidObjectException, MetaException { newParts = rawStore.alterPartitions( catName, dbName, tblName, partValsList, newParts, writeId, validWriteIds); - // in case of event based cache update, cache will be updated during commit. - if (canUseEvents) { - return newParts; - } catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); if (!shouldCacheTable(catName, dbName, tblName)) { return newParts; } - sharedCache.alterPartitionsInCache(catName, dbName, tblName, partValsList, newParts); + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl == null || !isTransactionalTable(tbl)) { + // Enforce monotonic reads for external table + triggerUpdateUsingEventAdhoc(getRawStore()); + sharedCache.alterPartitionsInCache(catName, dbName, tblName, partValsList, newParts); + } return newParts; } @@ -1661,7 +1444,7 @@ public boolean getPartitionsByExpr(String catName, String dbName, String tblName catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName) || catchingup) { return rawStore.getPartitionsByExpr(catName, dbName, tblName, expr, defaultPartitionName, maxParts, result); } List partNames = new LinkedList<>(); @@ -1692,7 +1475,7 @@ public int getNumPartitionsByExpr(String catName, String dbName, String tblName, catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName) || catchingup) { return rawStore.getNumPartitionsByExpr(catName, dbName, tblName, expr); } String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); @@ -1726,7 +1509,7 @@ public int getNumPartitionsByExpr(String catName, String dbName, String tblName, catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName) || catchingup) { return rawStore.getPartitionsByNames(catName, dbName, tblName, partNames); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1913,7 +1696,7 @@ public Partition getPartitionWithAuth(String catName, String dbName, String tblN catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName) || catchingup) { return rawStore.getPartitionWithAuth(catName, dbName, tblName, partVals, userName, groupNames); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1940,7 +1723,7 @@ public Partition getPartitionWithAuth(String catName, String dbName, String tblN catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName) || catchingup) { return rawStore.getPartitionsWithAuth(catName, dbName, tblName, maxParts, userName, groupNames); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1969,7 +1752,7 @@ public Partition getPartitionWithAuth(String catName, String dbName, String tblN catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName) || catchingup) { return rawStore.listPartitionNamesPs(catName, dbName, tblName, partSpecs, maxParts); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1998,7 +1781,7 @@ public Partition getPartitionWithAuth(String catName, String dbName, String tblN catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { + if (!shouldCacheTable(catName, dbName, tblName) || catchingup) { return rawStore.listPartitionsPsWithAuth(catName, dbName, tblName, partSpecs, maxParts, userName, groupNames); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -2079,59 +1862,28 @@ public static ColumnStatistics adjustColStatForGet(Map tablePara return colStat; } - private static void updateTableColumnsStatsInternal(Configuration conf, ColumnStatistics colStats, - Map newParams, String validWriteIds, - long writeId) throws MetaException { + @Override + public Map updateTableColumnStatistics(ColumnStatistics colStats, + String validWriteIds, long writeId) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + Map newParams = rawStore.updateTableColumnStatistics( + colStats, validWriteIds, writeId); String catName = colStats.getStatsDesc().isSetCatName() ? - normalizeIdentifier(colStats.getStatsDesc().getCatName()) : - getDefaultCatalog(conf); + normalizeIdentifier(colStats.getStatsDesc().getCatName()) : + getDefaultCatalog(conf); String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName()); String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName()); if (!shouldCacheTable(catName, dbName, tblName)) { - return; + return newParams; } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); - if (table == null) { - // The table is not yet loaded in cache - return; + if (table!=null && !isTransactionalTable(table)) { + // Enforce monotonic reads for external table + triggerUpdateUsingEventAdhoc(getRawStore()); + sharedCache.alterTableInCache(catName, dbName, tblName, table); + sharedCache.updateTableColStatsInCache(catName, dbName, tblName, colStats.getStatsObj()); } - boolean isTxn = TxnUtils.isTransactionalTable(table.getParameters()); - if (isTxn && validWriteIds != null) { - if (!areTxnStatsSupported) { - StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); - } else { - String errorMsg = ObjectStore.verifyStatsChangeCtx(TableName.getDbTable(dbName, tblName), - table.getParameters(), newParams, writeId, validWriteIds, true); - if (errorMsg != null) { - throw new MetaException(errorMsg); - } - if (!ObjectStore.isCurrentStatsValidForTheQuery(newParams, table.getWriteId(), - validWriteIds, true)) { - // Make sure we set the flag to invalid regardless of the current value. - StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); - LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table " - + table.getDbName() + "." + table.getTableName()); - } - } - } - - table.setWriteId(writeId); - table.setParameters(newParams); - sharedCache.alterTableInCache(catName, dbName, tblName, table); - sharedCache.updateTableColStatsInCache(catName, dbName, tblName, colStats.getStatsObj()); - } - - @Override - public Map updateTableColumnStatistics(ColumnStatistics colStats, - String validWriteIds, long writeId) - throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { - Map newParams = rawStore.updateTableColumnStatistics( - colStats, validWriteIds, writeId); - // in case of event based cache update, cache will be updated during commit. - if (newParams != null && !canUseEvents) { - updateTableColumnsStatsInternal(conf, colStats, newParams, null, writeId); - } return newParams; } @@ -2149,7 +1901,7 @@ public ColumnStatistics getTableColumnStatistics( catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { + if (!shouldCacheTable(catName, dbName, tblName) || catchingup) { return rawStore.getTableColumnStatistics( catName, dbName, tblName, colNames, validWriteIds); } @@ -2174,14 +1926,16 @@ public boolean deleteTableColumnStatistics(String catName, String dbName, String String colName) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.deleteTableColumnStatistics(catName, dbName, tblName, colName); - // in case of event based cache update, cache is updated during commit txn - if (succ && !canUseEvents) { - catName = normalizeIdentifier(catName); - dbName = normalizeIdentifier(dbName); - tblName = normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { - return succ; - } + catName = normalizeIdentifier(catName); + dbName = normalizeIdentifier(dbName); + tblName = normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return succ; + } + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl!=null && !isTransactionalTable(tbl)) { + // Enforce monotonic reads for external table + triggerUpdateUsingEventAdhoc(getRawStore()); sharedCache.removeTableColStatsFromCache(catName, dbName, tblName, colName); } return succ; @@ -2193,15 +1947,17 @@ public boolean deleteTableColumnStatistics(String catName, String dbName, String throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { Map newParams = rawStore.updatePartitionColumnStatistics( colStats, partVals, validWriteIds, writeId); - // in case of event based cache update, cache is updated during commit txn - if (newParams != null && !canUseEvents) { - String catName = colStats.getStatsDesc().isSetCatName() ? - normalizeIdentifier(colStats.getStatsDesc().getCatName()) : DEFAULT_CATALOG_NAME; - String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName()); - String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName()); - if (!shouldCacheTable(catName, dbName, tblName)) { - return newParams; - } + String catName = colStats.getStatsDesc().isSetCatName() ? + normalizeIdentifier(colStats.getStatsDesc().getCatName()) : DEFAULT_CATALOG_NAME; + String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName()); + String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName()); + if (!shouldCacheTable(catName, dbName, tblName)) { + return newParams; + } + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl!=null && !isTransactionalTable(tbl)) { + // Enforce monotonic reads for external table + triggerUpdateUsingEventAdhoc(getRawStore()); Partition part = getPartition(catName, dbName, tblName, partVals); part.setParameters(newParams); sharedCache.alterPartitionInCache(catName, dbName, tblName, partVals, part); @@ -2240,14 +1996,16 @@ public boolean deletePartitionColumnStatistics(String catName, String dbName, St throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.deletePartitionColumnStatistics(catName, dbName, tblName, partName, partVals, colName); - // in case of event based cache update, cache is updated during commit txn. - if (succ && !canUseEvents) { - catName = normalizeIdentifier(catName); - dbName = normalizeIdentifier(dbName); - tblName = normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { - return succ; - } + catName = normalizeIdentifier(catName); + dbName = normalizeIdentifier(dbName); + tblName = normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return succ; + } + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl!=null && !isTransactionalTable(tbl)) { + // Enforce monotonic reads for external table + triggerUpdateUsingEventAdhoc(getRawStore()); sharedCache.removePartitionColStatsFromCache(catName, dbName, tblName, partVals, colName); } return succ; @@ -2271,7 +2029,7 @@ public AggrStats get_aggr_stats_for(String catName, String dbName, String tblNam // TODO: we currently cannot do transactional checks for stats here // (incl. due to lack of sync w.r.t. the below rawStore call). // In case the cache is updated using events, aggregate is calculated locally and thus can be read from cache. - if (!shouldCacheTable(catName, dbName, tblName) || (writeIdList != null && !canUseEvents)) { + if (!shouldCacheTable(catName, dbName, tblName) || writeIdList != null || catchingup) { return rawStore.get_aggr_stats_for( catName, dbName, tblName, partNames, colNames, writeIdList); } @@ -2304,7 +2062,9 @@ public AggrStats get_aggr_stats_for(String catName, String dbName, String tblNam LOG.debug("Didn't find aggr stats in cache. Merging them. tblName= {}, parts= {}, cols= {}", tblName, partNames, colNames); MergedColumnStatsForPartitions mergedColStats = mergeColStatsForPartitions(catName, dbName, tblName, - partNames, colNames, sharedCache, type, writeIdList); + partNames, colNames, sharedCache, type, writeIdList, + MetastoreConf.getBoolVar(getConf(), ConfVars.STATS_NDV_DENSITY_FUNCTION), + MetastoreConf.getDoubleVar(getConf(), ConfVars.STATS_NDV_TUNER)); if (mergedColStats == null) { LOG.info("Aggregate stats of partition " + TableName.getQualified(catName, dbName, tblName) + "." + partNames + " for columns " + colNames + " is not present in cache. Getting it from raw store"); @@ -2313,12 +2073,11 @@ public AggrStats get_aggr_stats_for(String catName, String dbName, String tblNam return new AggrStats(mergedColStats.getColStats(), mergedColStats.getPartsFound()); } - private MergedColumnStatsForPartitions mergeColStatsForPartitions( + @VisibleForTesting + static MergedColumnStatsForPartitions mergeColStatsForPartitions( String catName, String dbName, String tblName, List partNames, List colNames, - SharedCache sharedCache, StatsType type, String writeIdList) throws MetaException { - final boolean useDensityFunctionForNDVEstimation = - MetastoreConf.getBoolVar(getConf(), ConfVars.STATS_NDV_DENSITY_FUNCTION); - final double ndvTuner = MetastoreConf.getDoubleVar(getConf(), ConfVars.STATS_NDV_TUNER); + SharedCache sharedCache, StatsType type, String writeIdList, + boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { Map> colStatsMap = new HashMap<>(); long partsFound = partNames.size(); Map, Long> partNameToWriteId = writeIdList != null ? new HashMap<>() : null; @@ -2379,23 +2138,21 @@ private MergedColumnStatsForPartitions mergeColStatsForPartitions( List colAggrStats = MetaStoreServerUtils.aggrPartitionStats(colStatsMap, partNames, partsFound == partNames.size(), useDensityFunctionForNDVEstimation, ndvTuner); - if (canUseEvents) { - if (type == StatsType.ALL) { - sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), new AggrStats(colAggrStats, partsFound), - null, partNameToWriteId); - } else if (type == StatsType.ALLBUTDEFAULT) { - sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), null, - new AggrStats(colAggrStats, partsFound), partNameToWriteId); - } + if (type == StatsType.ALL) { + sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), new AggrStats(colAggrStats, partsFound), + null, partNameToWriteId); + } else if (type == StatsType.ALLBUTDEFAULT) { + sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), null, + new AggrStats(colAggrStats, partsFound), partNameToWriteId); } return new MergedColumnStatsForPartitions(colAggrStats, partsFound); } - class MergedColumnStatsForPartitions { + static class MergedColumnStatsForPartitions { List colStats = new ArrayList(); long partsFound; @@ -2706,20 +2463,20 @@ public int getDatabaseCount() throws MetaException { // TODO constraintCache List constraintNames = rawStore.createTableWithConstraints(tbl, primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints); - // in case of event based cache update, cache is updated during commit. - if (canUseEvents) { - return constraintNames; - } - String dbName = normalizeIdentifier(tbl.getDbName()); - String tblName = normalizeIdentifier(tbl.getTableName()); - String catName = tbl.isSetCatName() ? normalizeIdentifier(tbl.getCatName()) : - DEFAULT_CATALOG_NAME; - if (!shouldCacheTable(catName, dbName, tblName)) { - return constraintNames; + if (!isTransactionalTable(tbl)) { + // Enforce monotonic reads for external table + triggerUpdateUsingEventAdhoc(getRawStore()); + String dbName = normalizeIdentifier(tbl.getDbName()); + String tblName = normalizeIdentifier(tbl.getTableName()); + String catName = tbl.isSetCatName() ? normalizeIdentifier(tbl.getCatName()) : + DEFAULT_CATALOG_NAME; + if (!shouldCacheTable(catName, dbName, tblName)) { + return constraintNames; + } + sharedCache.addTableToCache(StringUtils.normalizeIdentifier(tbl.getCatName()), + StringUtils.normalizeIdentifier(tbl.getDbName()), + StringUtils.normalizeIdentifier(tbl.getTableName()), tbl); } - sharedCache.addTableToCache(StringUtils.normalizeIdentifier(tbl.getCatName()), - StringUtils.normalizeIdentifier(tbl.getDbName()), - StringUtils.normalizeIdentifier(tbl.getTableName()), tbl); return constraintNames; } @@ -3057,12 +2814,6 @@ static boolean isBlacklistWhitelistEmpty(Configuration conf) { && MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST).isEmpty(); } - @VisibleForTesting - void resetCatalogCache() { - sharedCache.resetCatalogCache(); - setCachePrewarmedState(false); - } - @Override public void addRuntimeStat(RuntimeStat stat) throws MetaException { rawStore.addRuntimeStat(stat); @@ -3093,4 +2844,17 @@ public int deleteRuntimeStats(int maxRetainSecs) throws MetaException { String dbName, String tableName) throws MetaException, NoSuchObjectException { return rawStore.getPartitionColsWithStats(catName, dbName, tableName); } + + public static boolean isTransactionalTable(org.apache.hadoop.hive.metastore.api.Table table) { + return table != null && table.getParameters() != null && + isTablePropertyTransactional(table.getParameters()); + } + + public static boolean isTablePropertyTransactional(Map parameters) { + String resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (resultStr == null) { + resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + } + return resultStr != null && resultStr.equalsIgnoreCase("true"); + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index 05cf70b..323b41c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -67,10 +67,6 @@ public class SharedCache { private static ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(true); - private boolean isCatalogCachePrewarmed = false; - private Map catalogCache = new TreeMap<>(); - private HashSet catalogsDeletedDuringPrewarm = new HashSet<>(); - private AtomicBoolean isCatalogCacheDirty = new AtomicBoolean(false); // For caching Database objects. Key is database name private Map databaseCache = new TreeMap<>(); @@ -386,32 +382,6 @@ public void alterPartitions(List> partValsList, List new } } - public void refreshPartitions(List partitions, SharedCache sharedCache) { - Map newPartitionCache = new HashMap(); - try { - tableLock.writeLock().lock(); - for (Partition part : partitions) { - if (isPartitionCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping partition cache update for table: " + getTable().getTableName() - + "; the partition list we have is dirty."); - return; - } - String key = CacheUtils.buildPartitionCacheKey(part.getValues()); - PartitionWrapper wrapper = partitionCache.get(key); - if (wrapper != null) { - if (wrapper.getSdHash() != null) { - sharedCache.decrSd(wrapper.getSdHash()); - } - } - wrapper = makePartitionWrapper(part, sharedCache); - newPartitionCache.put(key, wrapper); - } - partitionCache = newPartitionCache; - } finally { - tableLock.writeLock().unlock(); - } - } - public boolean updateTableColStats(List colStatsForTable) { try { tableLock.writeLock().lock(); @@ -454,27 +424,6 @@ public boolean updateTableColStats(List colStatsForTable) { } } - public void refreshTableColStats(List colStatsForTable) { - Map newTableColStatsCache = - new HashMap(); - try { - tableLock.writeLock().lock(); - for (ColumnStatisticsObj colStatObj : colStatsForTable) { - if (isTableColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping table col stats cache update for table: " - + getTable().getTableName() + "; the table col stats list we have is dirty."); - return; - } - String key = colStatObj.getColName(); - // TODO: get rid of deepCopy after making sure callers don't use references - newTableColStatsCache.put(key, colStatObj.deepCopy()); - } - tableColStatsCache = newTableColStatsCache; - } finally { - tableLock.writeLock().unlock(); - } - } - public ColumnStatistics getCachedTableColStats(ColumnStatisticsDesc csd, List colNames, String validWriteIds, boolean areTxnStatsSupported) throws MetaException { @@ -675,42 +624,6 @@ public void removeAllPartitionColStats() { } } - public void refreshPartitionColStats(List partitionColStats) { - Map newPartitionColStatsCache = - new HashMap(); - try { - tableLock.writeLock().lock(); - String tableName = StringUtils.normalizeIdentifier(getTable().getTableName()); - for (ColumnStatistics cs : partitionColStats) { - if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping partition column stats cache update for table: " - + getTable().getTableName() + "; the partition column stats list we have is dirty"); - return; - } - List partVal; - try { - partVal = Warehouse.makeValsFromName(cs.getStatsDesc().getPartName(), null); - List colStatsObjs = cs.getStatsObj(); - for (ColumnStatisticsObj colStatObj : colStatsObjs) { - if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping partition column stats cache update for table: " - + getTable().getTableName() + "; the partition column list we have is dirty"); - return; - } - String key = - CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName()); - newPartitionColStatsCache.put(key, colStatObj.deepCopy()); - } - } catch (MetaException e) { - LOG.debug("Unable to cache partition column stats for table: " + tableName, e); - } - } - partitionColStatsCache = newPartitionColStatsCache; - } finally { - tableLock.writeLock().unlock(); - } - } - public List getAggrPartitionColStats(List colNames, StatsType statsType) { List colStats = new ArrayList(); @@ -935,95 +848,6 @@ public ColumnStatisticsObj getColumnStatisticsObj() { } } - public void populateCatalogsInCache(Collection catalogs) { - for (Catalog cat : catalogs) { - Catalog catCopy = cat.deepCopy(); - // ObjectStore also stores db name in lowercase - catCopy.setName(catCopy.getName().toLowerCase()); - try { - cacheLock.writeLock().lock(); - // Since we allow write operations on cache while prewarm is happening: - // 1. Don't add databases that were deleted while we were preparing list for prewarm - // 2. Skip overwriting exisiting db object - // (which is present because it was added after prewarm started) - if (catalogsDeletedDuringPrewarm.contains(catCopy.getName())) { - continue; - } - catalogCache.putIfAbsent(catCopy.getName(), catCopy); - catalogsDeletedDuringPrewarm.clear(); - isCatalogCachePrewarmed = true; - } finally { - cacheLock.writeLock().unlock(); - } - } - } - - public Catalog getCatalogFromCache(String name) { - Catalog cat = null; - try { - cacheLock.readLock().lock(); - if (catalogCache.get(name) != null) { - cat = catalogCache.get(name).deepCopy(); - } - } finally { - cacheLock.readLock().unlock(); - } - return cat; - } - - public void addCatalogToCache(Catalog cat) { - try { - cacheLock.writeLock().lock(); - Catalog catCopy = cat.deepCopy(); - // ObjectStore also stores db name in lowercase - catCopy.setName(catCopy.getName().toLowerCase()); - catalogCache.put(cat.getName(), catCopy); - isCatalogCacheDirty.set(true); - } finally { - cacheLock.writeLock().unlock(); - } - } - - public void alterCatalogInCache(String catName, Catalog newCat) { - try { - cacheLock.writeLock().lock(); - removeCatalogFromCache(catName); - addCatalogToCache(newCat.deepCopy()); - } finally { - cacheLock.writeLock().unlock(); - } - } - - public void removeCatalogFromCache(String name) { - name = normalizeIdentifier(name); - try { - cacheLock.writeLock().lock(); - // If db cache is not yet prewarmed, add this to a set which the prewarm thread can check - // so that the prewarm thread does not add it back - if (!isCatalogCachePrewarmed) { - catalogsDeletedDuringPrewarm.add(name); - } - if (catalogCache.remove(name) != null) { - isCatalogCacheDirty.set(true); - } - } finally { - cacheLock.writeLock().unlock(); - } - } - - public List listCachedCatalogs() { - try { - cacheLock.readLock().lock(); - return new ArrayList<>(catalogCache.keySet()); - } finally { - cacheLock.readLock().unlock(); - } - } - - public boolean isCatalogCachePrewarmed() { - return isCatalogCachePrewarmed; - } - public Database getDatabaseFromCache(String catName, String name) { Database db = null; try { @@ -1147,23 +971,6 @@ public void alterDatabaseInCache(String catName, String dbName, Database newDb) } } - public boolean refreshDatabasesInCache(List databases) { - if (isDatabaseCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping database cache update; the database list we have is dirty."); - return false; - } - try { - cacheLock.writeLock().lock(); - databaseCache.clear(); - for (Database db : databases) { - addDatabaseToCache(db); - } - return true; - } finally { - cacheLock.writeLock().unlock(); - } - } - public int getCachedDatabaseCount() { try { cacheLock.readLock().lock(); @@ -1445,38 +1252,6 @@ public void alterTableAndStatsInCache(String catName, String dbName, String tblN return tableNames; } - public boolean refreshTablesInCache(String catName, String dbName, List
tables) { - if (isTableCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping table cache update; the table list we have is dirty."); - return false; - } - Map newCacheForDB = new TreeMap<>(); - for (Table tbl : tables) { - String tblName = StringUtils.normalizeIdentifier(tbl.getTableName()); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); - if (tblWrapper != null) { - tblWrapper.updateTableObj(tbl, this); - } else { - tblWrapper = createTableWrapper(catName, dbName, tblName, tbl); - } - newCacheForDB.put(CacheUtils.buildTableKey(catName, dbName, tblName), tblWrapper); - } - try { - cacheLock.writeLock().lock(); - Iterator> entryIterator = tableCache.entrySet().iterator(); - while (entryIterator.hasNext()) { - String key = entryIterator.next().getKey(); - if (key.startsWith(CacheUtils.buildDbKeyWithDelimiterSuffix(catName, dbName))) { - entryIterator.remove(); - } - } - tableCache.putAll(newCacheForDB); - return true; - } finally { - cacheLock.writeLock().unlock(); - } - } - public ColumnStatistics getTableColStatsFromCache(String catName, String dbName, String tblName, List colNames, String validWriteIds, boolean areTxnStatsSupported) throws MetaException { try { @@ -1508,20 +1283,6 @@ public void removeTableColStatsFromCache(String catName, String dbName, String t } } - public void removeAllTableColStatsFromCache(String catName, String dbName, String tblName) { - try { - cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); - if (tblWrapper != null) { - tblWrapper.removeAllTableColStats(); - } else { - LOG.info("Table " + tblName + " is missing from cache."); - } - } finally { - cacheLock.readLock().unlock(); - } - } - public void updateTableColStatsInCache(String catName, String dbName, String tableName, List colStatsForTable) { try { @@ -1538,22 +1299,6 @@ public void updateTableColStatsInCache(String catName, String dbName, String tab } } - public void refreshTableColStatsInCache(String catName, String dbName, String tableName, - List colStatsForTable) { - try { - cacheLock.readLock().lock(); - TableWrapper tblWrapper = - tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName)); - if (tblWrapper != null) { - tblWrapper.refreshTableColStats(colStatsForTable); - } else { - LOG.info("Table " + tableName + " is missing from cache."); - } - } finally { - cacheLock.readLock().unlock(); - } - } - public int getCachedTableCount() { try { cacheLock.readLock().lock(); @@ -1727,19 +1472,6 @@ public void alterPartitionsInCache(String catName, String dbName, String tblName } } - public void refreshPartitionsInCache(String catName, String dbName, String tblName, - List partitions) { - try { - cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); - if (tblWrapper != null) { - tblWrapper.refreshPartitions(partitions, this); - } - } finally { - cacheLock.readLock().unlock(); - } - } - public void removePartitionColStatsFromCache(String catName, String dbName, String tblName, List partVals, String colName) { try { @@ -1753,18 +1485,6 @@ public void removePartitionColStatsFromCache(String catName, String dbName, Stri } } - public void removeAllPartitionColStatsFromCache(String catName, String dbName, String tblName) { - try { - cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); - if (tblWrapper != null) { - tblWrapper.removeAllPartitionColStats(); - } - } finally { - cacheLock.readLock().unlock(); - } - } - public void updatePartitionColStatsInCache(String catName, String dbName, String tableName, List partVals, List colStatsObjs) { try { @@ -1812,19 +1532,6 @@ public ColumStatsWithWriteId getPartitionColStatsFromCache(String catName, Strin return colStatObjs; } - public void refreshPartitionColStatsInCache(String catName, String dbName, String tblName, - List partitionColStats) { - try { - cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); - if (tblWrapper != null) { - tblWrapper.refreshPartitionColStats(partitionColStats); - } - } finally { - cacheLock.readLock().unlock(); - } - } - public List getAggrStatsFromCache(String catName, String dbName, String tblName, List colNames, StatsType statsType) { try { @@ -1909,18 +1616,7 @@ public synchronized StorageDescriptor getSdFromCache(byte[] sdHash) { return sdCache; } - /** - * This resets the contents of the cataog cache so that we can re-fill it in another test. - */ - void resetCatalogCache() { - isCatalogCachePrewarmed = false; - catalogCache.clear(); - catalogsDeletedDuringPrewarm.clear(); - isCatalogCacheDirty.set(false); - } - void clearDirtyFlags() { - isCatalogCacheDirty.set(false); isDatabaseCacheDirty.set(false); isTableCacheDirty.set(false); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java index ba61a08..0e57835 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java @@ -39,6 +39,7 @@ private Map parameters; private List partVals; private Table tableObj; + private String writeIds; /** * @param statsObj Columns statistics Info. @@ -49,13 +50,14 @@ * @param handler handler that is firing the event */ public UpdatePartitionColumnStatEvent(ColumnStatistics statsObj, List partVals, Map parameters, - Table tableObj, long writeId, IHMSHandler handler) { + Table tableObj, long writeId, String writeIds, IHMSHandler handler) { super(true, handler); this.partColStats = statsObj; this.writeId = writeId; this.parameters = parameters; this.partVals = partVals; this.tableObj = tableObj; + this.writeIds = writeIds; } /** @@ -64,13 +66,14 @@ public UpdatePartitionColumnStatEvent(ColumnStatistics statsObj, List pa * @param handler handler that is firing the event */ public UpdatePartitionColumnStatEvent(ColumnStatistics statsObj, List partVals, - Table tableObj, IHMSHandler handler) { + Table tableObj, String writeIds, IHMSHandler handler) { super(true, handler); this.partColStats = statsObj; this.partVals = partVals; this.writeId = 0; this.parameters = null; this.tableObj = tableObj; + this.writeIds = writeIds; } public ColumnStatistics getPartColStats() { @@ -90,4 +93,9 @@ public long getWriteId() { } public Table getTableObj() { return tableObj; } + + public String getWriteIds() + { + return writeIds; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java index 71300ab..48d1206 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java @@ -38,22 +38,25 @@ private long writeId; private Map parameters; private Table tableObj; + private String writeIds; /** * @param colStats Columns statistics Info. * @param tableObj table object * @param parameters table parameters to be updated after stats are updated. * @param writeId writeId for the query. + * @param writeIds writeIds for the query * @param handler handler that is firing the event */ public UpdateTableColumnStatEvent(ColumnStatistics colStats, Table tableObj, Map parameters, - long writeId, IHMSHandler handler) { + long writeId, String writeIds, IHMSHandler handler) { super(true, handler); this.colStats = colStats; this.writeId = writeId; this.parameters = parameters; this.tableObj = tableObj; + this.writeIds = writeIds; } /** @@ -76,6 +79,10 @@ public long getWriteId() { return writeId; } + public String getWriteIds() { + return writeIds; + } + public Map getTableParameters() { return parameters; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java index aa83da4..4518d79 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java @@ -289,9 +289,9 @@ public AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, public JSONUpdateTableColumnStatMessage buildUpdateTableColumnStatMessage(ColumnStatistics colStats, Table tableObj, Map parameters, - long writeId) { + long writeId, String writeIds) { return new JSONUpdateTableColumnStatMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), - colStats, tableObj, parameters, writeId); + colStats, tableObj, parameters, writeId, writeIds); } public JSONDeleteTableColumnStatMessage buildDeleteTableColumnStatMessage(String dbName, String colName) { @@ -300,9 +300,9 @@ public JSONDeleteTableColumnStatMessage buildDeleteTableColumnStatMessage(String public JSONUpdatePartitionColumnStatMessage buildUpdatePartitionColumnStatMessage(ColumnStatistics colStats, List partVals, Map parameters, - Table tableObj, long writeId) { + Table tableObj, long writeId, String writeIds) { return new JSONUpdatePartitionColumnStatMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), colStats, partVals, - parameters, tableObj, writeId); + parameters, tableObj, writeId, writeIds); } public JSONDeletePartitionColumnStatMessage buildDeletePartitionColumnStatMessage(String dbName, String colName, diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java index e92a0dc..f685bc4 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java @@ -41,4 +41,6 @@ protected UpdatePartitionColumnStatMessage() { public abstract List getPartVals(); public abstract Table getTableObject() throws Exception; + + public abstract String getWriteIds(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java index e3f049c..e5f7ef8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java @@ -38,4 +38,6 @@ protected UpdateTableColumnStatMessage() { public abstract Map getParameters(); public abstract Table getTableObject() throws Exception; + + public abstract String getWriteIds() throws Exception; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java index fd7fe00..2e4d9de 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java @@ -52,6 +52,9 @@ @JsonProperty private String tableObjJson; + @JsonProperty + private String writeIds; + /** * Default constructor, needed for Jackson. */ @@ -61,7 +64,7 @@ public JSONUpdatePartitionColumnStatMessage() { public JSONUpdatePartitionColumnStatMessage(String server, String servicePrincipal, Long timestamp, ColumnStatistics colStats, List partVals, Map parameters, - Table tableObj, long writeId) { + Table tableObj, long writeId, String writeIds) { this.timestamp = timestamp; this.server = server; this.servicePrincipal = servicePrincipal; @@ -75,6 +78,7 @@ public JSONUpdatePartitionColumnStatMessage(String server, String servicePrincip throw new IllegalArgumentException("Could not serialize JSONUpdatePartitionColumnStatMessage : ", e); } this.parameters = parameters; + this.writeIds = writeIds; } @Override @@ -127,6 +131,11 @@ public Table getTableObject() throws Exception { } @Override + public String getWriteIds() { + return writeIds; + } + + @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java index 275d204..40636c8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java @@ -47,6 +47,9 @@ @JsonProperty private String tableObjJson; + @JsonProperty + private String writeIds; + /** * Default constructor, needed for Jackson. */ @@ -55,7 +58,7 @@ public JSONUpdateTableColumnStatMessage() { public JSONUpdateTableColumnStatMessage(String server, String servicePrincipal, Long timestamp, ColumnStatistics colStats, Table tableObj, Map parameters, - long writeId) { + long writeId, String writeIds) { this.timestamp = timestamp; this.server = server; this.servicePrincipal = servicePrincipal; @@ -68,6 +71,7 @@ public JSONUpdateTableColumnStatMessage(String server, String servicePrincipal, throw new IllegalArgumentException("Could not serialize JSONUpdateTableColumnStatMessage : ", e); } this.parameters = parameters; + this.writeIds = writeIds; } @Override @@ -115,6 +119,11 @@ public Long getWriteId() { } @Override + public String getWriteIds() { + return writeIds; + } + + @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java index 8caa929..560d5dd 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java @@ -271,89 +271,6 @@ public void testPrewarmMemoryEstimation() throws Exception { } @Test - public void testCacheUpdate() throws Exception { - Configuration conf = MetastoreConf.newMetastoreConf(); - MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); - MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); - MetaStoreTestUtils.setConfForStandloneMode(conf); - CachedStore cachedStore = new CachedStore(); - CachedStore.clearSharedCache(); - cachedStore.setConfForTest(conf); - ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); - // Prewarm CachedStore - CachedStore.setCachePrewarmedState(false); - CachedStore.prewarm(objectStore); - // Drop basedb1's unpartitioned table - objectStore.dropTable(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName()); - Deadline.startTimer(""); - // Drop a partitions of basedb1's partitioned table - objectStore.dropPartitions(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName(), db1Ptbl1PtnNames); - // Update SharedCache - updateCache(cachedStore); - List allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME); - Assert.assertEquals(2, allDatabases.size()); - Assert.assertTrue(allDatabases.contains(db1.getName())); - Assert.assertTrue(allDatabases.contains(db2.getName())); - // cs_db1_ptntbl1 - List db1Tbls = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db1.getName()); - Assert.assertEquals(1, db1Tbls.size()); - Assert.assertTrue(db1Tbls.contains(db1Ptbl1.getTableName())); - List db1Ptns = - cachedStore.getPartitions(DEFAULT_CATALOG_NAME, db1.getName(), db1Ptbl1.getTableName(), -1); - Assert.assertEquals(0, db1Ptns.size()); - // cs_db2_ptntbl1 - List db2Tbls = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); - Assert.assertEquals(2, db2Tbls.size()); - Assert.assertTrue(db2Tbls.contains(db2Utbl1.getTableName())); - Assert.assertTrue(db2Tbls.contains(db2Ptbl1.getTableName())); - List db2Ptns = - cachedStore.getPartitions(DEFAULT_CATALOG_NAME, db2.getName(), db2Ptbl1.getTableName(), -1); - Assert.assertEquals(25, db2Ptns.size()); - Deadline.startTimer(""); - List db2PtnsOS = - objectStore.getPartitions(DEFAULT_CATALOG_NAME, db2.getName(), db2Ptbl1.getTableName(), -1); - Assert.assertTrue(db2Ptns.containsAll(db2PtnsOS)); - // Create a new unpartitioned table under basedb1 - Table db1Utbl2 = createUnpartitionedTableObject(db1); - db1Utbl2.setTableName(db1.getName() + "_unptntbl2"); - objectStore.createTable(db1Utbl2); - // Add a new partition to db1PartitionedTable - // Create partitions for cs_db1's partitioned table - db1Ptbl1Ptns = createPartitionObjects(db1Ptbl1).getPartitions(); - Deadline.startTimer(""); - objectStore.addPartition(db1Ptbl1Ptns.get(0)); - objectStore.addPartition(db1Ptbl1Ptns.get(1)); - objectStore.addPartition(db1Ptbl1Ptns.get(2)); - objectStore.addPartition(db1Ptbl1Ptns.get(3)); - objectStore.addPartition(db1Ptbl1Ptns.get(4)); - updateCache(cachedStore); - allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME); - Assert.assertEquals(2, allDatabases.size()); - Assert.assertTrue(allDatabases.contains(db1.getName())); - Assert.assertTrue(allDatabases.contains(db2.getName())); - db1Tbls = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db1.getName()); - Assert.assertEquals(2, db1Tbls.size()); - Assert.assertTrue(db1Tbls.contains(db1Ptbl1.getTableName())); - Assert.assertTrue(db1Tbls.contains(db1Utbl2.getTableName())); - db2Tbls = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); - Assert.assertEquals(2, db2Tbls.size()); - Assert.assertTrue(db2Tbls.contains(db2Utbl1.getTableName())); - Assert.assertTrue(db2Tbls.contains(db2Ptbl1.getTableName())); - // cs_db1_ptntbl1 - db1Ptns = cachedStore.getPartitions(DEFAULT_CATALOG_NAME, db1.getName(), db1Ptbl1.getTableName(), -1); - Assert.assertEquals(5, db1Ptns.size()); - // cs_db2_ptntbl1 - db2Ptns = cachedStore.getPartitions(DEFAULT_CATALOG_NAME, db2.getName(), db2Ptbl1.getTableName(), -1); - Assert.assertEquals(25, db2Ptns.size()); - Deadline.startTimer(""); - db2PtnsOS = objectStore.getPartitions(DEFAULT_CATALOG_NAME, db2.getName(), db2Ptbl1.getTableName(), -1); - Assert.assertTrue(db2Ptns.containsAll(db2PtnsOS)); - // Clean up - objectStore.dropTable(DEFAULT_CATALOG_NAME, db1Utbl2.getDbName(), db1Utbl2.getTableName()); - cachedStore.shutdown(); - } - - @Test public void testCreateAndGetDatabase() throws Exception { Configuration conf = MetastoreConf.newMetastoreConf(); MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); @@ -432,11 +349,7 @@ public void testDropDatabase() throws Exception { Assert.assertEquals(db1, dbRead); allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME); Assert.assertEquals(3, allDatabases.size()); - objectStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName1); - updateCache(cachedStore); - updateCache(cachedStore); - allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME); - Assert.assertEquals(2, allDatabases.size()); + cachedStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName1); cachedStore.shutdown(); } @@ -466,17 +379,6 @@ public void testAlterDatabase() throws Exception { // Read db via ObjectStore Database dbRead = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); Assert.assertEquals(db, dbRead); - // Alter db via ObjectStore - dbOwner = "user3"; - db = new Database(db1); - db.setOwnerName(dbOwner); - objectStore.alterDatabase(DEFAULT_CATALOG_NAME, dbName, db); - db = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); - updateCache(cachedStore); - updateCache(cachedStore); - // Read db via CachedStore - dbRead = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); - Assert.assertEquals(db, dbRead); cachedStore.shutdown(); } @@ -510,20 +412,8 @@ public void testCreateAndGetTable() throws Exception { db1Utbl2 = cachedStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl2.getDbName(), db1Utbl2.getTableName()); Table tblRead = objectStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl2.getDbName(), db1Utbl2.getTableName()); Assert.assertEquals(db1Utbl2, tblRead); - // Create a new unpartitioned table under basedb2 via ObjectStore - Table db2Utbl2 = createUnpartitionedTableObject(db2); - db2Utbl2.setTableName(db2.getName() + "_unptntbl2"); - objectStore.createTable(db2Utbl2); - db2Utbl2 = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl2.getDbName(), db2Utbl2.getTableName()); - updateCache(cachedStore); - db2Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); - Assert.assertEquals(3, db2Tables.size()); - tblRead = cachedStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl2.getDbName(), db2Utbl2.getTableName()); - Assert.assertEquals(db2Utbl2, tblRead); - // Clean up objectStore.dropTable(DEFAULT_CATALOG_NAME, db1Utbl2.getDbName(), db1Utbl2.getTableName()); db1Utbl2 = cachedStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl2.getDbName(), db1Utbl2.getTableName()); - objectStore.dropTable(DEFAULT_CATALOG_NAME, db2Utbl2.getDbName(), db2Utbl2.getTableName()); cachedStore.shutdown(); } @@ -656,18 +546,6 @@ public void testAlterTable() throws Exception { Table db1Utbl1ReadOS = objectStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl1ReadAlt.getDbName(), db1Utbl1ReadAlt.getTableName()); Assert.assertEquals(db1Utbl1Read, db1Utbl1ReadOS); - // Alter table db2Utbl1 via ObjectStore and read via CachedStore - Table db2Utbl1Read = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName()); - Table db2Utbl1ReadAlt = new Table(db2Utbl1Read); - db2Utbl1ReadAlt.setOwner(newOwner); - objectStore.alterTable(DEFAULT_CATALOG_NAME, db2Utbl1Read.getDbName(), db2Utbl1Read.getTableName(), db2Utbl1ReadAlt, - "0"); - updateCache(cachedStore); - db2Utbl1Read = - objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1ReadAlt.getDbName(), db2Utbl1ReadAlt.getTableName()); - Table d21Utbl1ReadCS = - cachedStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1ReadAlt.getDbName(), db2Utbl1ReadAlt.getTableName()); - Assert.assertEquals(db2Utbl1Read, d21Utbl1ReadCS); cachedStore.shutdown(); } @@ -696,17 +574,6 @@ public void testDropTable() throws Exception { Table db1Utbl1ReadOS = objectStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl1Read.getDbName(), db1Utbl1Read.getTableName()); Assert.assertNull(db1Utbl1ReadOS); - // Drop table db2Utbl1 via ObjectStore and read via CachedStore - Table db2Utbl1Read = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName()); - objectStore.dropTable(DEFAULT_CATALOG_NAME, db2Utbl1Read.getDbName(), db2Utbl1Read.getTableName()); - db2Tables = objectStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); - Assert.assertEquals(1, db2Tables.size()); - updateCache(cachedStore); - db2Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); - Assert.assertEquals(1, db2Tables.size()); - Table db2Utbl1ReadCS = - cachedStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1Read.getDbName(), db2Utbl1Read.getTableName()); - Assert.assertNull(db2Utbl1ReadCS); cachedStore.shutdown(); } @@ -1604,19 +1471,4 @@ private PartitionObjectsAndNames createPartitionObjects(Table table) { return ptnNames; } } - - // This method will return only after the cache has updated once - private void updateCache(CachedStore cachedStore) throws Exception { - int maxTries = 100; - long updateCountBefore = cachedStore.getCacheUpdateCount(); - // Start the CachedStore update service - CachedStore.startCacheUpdateService(cachedStore.getConf(), true, false); - while ((cachedStore.getCacheUpdateCount() != (updateCountBefore + 1)) && (maxTries-- > 0)) { - Thread.sleep(1000); - } - if (maxTries <= 0) { - throw new Exception("Unable to update SharedCache in 100 attempts; possibly some bug"); - } - CachedStore.stopCacheUpdateService(100); - } } diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCatalogCaching.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCatalogCaching.java deleted file mode 100644 index 423dce8..0000000 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCatalogCaching.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.cache; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.HiveMetaStore; -import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; -import org.apache.hadoop.hive.metastore.ObjectStore; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest; -import org.apache.hadoop.hive.metastore.api.Catalog; -import org.apache.hadoop.hive.metastore.api.InvalidOperationException; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.client.builder.CatalogBuilder; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.util.Comparator; -import java.util.List; - -/** - * Tests that catalogs are properly cached. - */ -@Category(MetastoreCheckinTest.class) -public class TestCatalogCaching { - private static final String CAT1_NAME = "cat1"; - private static final String CAT2_NAME = "cat2"; - - private ObjectStore objectStore; - private Configuration conf; - private CachedStore cachedStore; - - @Before - public void createObjectStore() throws MetaException, InvalidOperationException { - conf = MetastoreConf.newMetastoreConf(); - MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); - MetaStoreTestUtils.setConfForStandloneMode(conf); - objectStore = new ObjectStore(); - objectStore.setConf(conf); - - // Create three catalogs - HiveMetaStore.HMSHandler.createDefaultCatalog(objectStore, new Warehouse(conf)); - - Catalog cat1 = new CatalogBuilder() - .setName(CAT1_NAME) - .setLocation("/tmp/cat1") - .build(); - objectStore.createCatalog(cat1); - Catalog cat2 = new CatalogBuilder() - .setName(CAT2_NAME) - .setLocation("/tmp/cat2") - .build(); - objectStore.createCatalog(cat2); - } - - @After - public void clearCatalogCache() throws MetaException, NoSuchObjectException { - List catalogs = objectStore.getCatalogs(); - for (String catalog : catalogs) objectStore.dropCatalog(catalog); - } - - @Test - public void defaultHiveOnly() throws Exception { - // By default just the Hive catalog should be cached. - cachedStore = new CachedStore(); - cachedStore.setConf(conf); - CachedStore.stopCacheUpdateService(1); - cachedStore.resetCatalogCache(); - - CachedStore.prewarm(objectStore); - - // Only the hive catalog should be cached - List cachedCatalogs = cachedStore.getCatalogs(); - Assert.assertEquals(1, cachedCatalogs.size()); - Assert.assertEquals(Warehouse.DEFAULT_CATALOG_NAME, cachedCatalogs.get(0)); - } - - @Test - public void cacheAll() throws Exception { - // Set the config value to empty string, which should result in all catalogs being cached. - Configuration newConf = new Configuration(conf); - MetastoreConf.setVar(newConf, MetastoreConf.ConfVars.CATALOGS_TO_CACHE, ""); - cachedStore = new CachedStore(); - cachedStore.setConf(newConf); - CachedStore.stopCacheUpdateService(1); - objectStore.setConf(newConf); // have to override it with the new conf since this is where - // prewarm gets the conf object - cachedStore.resetCatalogCache(); - - CachedStore.prewarm(objectStore); - - // All the catalogs should be cached - List cachedCatalogs = cachedStore.getCatalogs(); - Assert.assertEquals(3, cachedCatalogs.size()); - cachedCatalogs.sort(Comparator.naturalOrder()); - Assert.assertEquals(CAT1_NAME, cachedCatalogs.get(0)); - Assert.assertEquals(CAT2_NAME, cachedCatalogs.get(1)); - Assert.assertEquals(Warehouse.DEFAULT_CATALOG_NAME, cachedCatalogs.get(2)); - } - - @Test - public void cacheSome() throws Exception { - // Set the config value to 2 catalogs other than hive - Configuration newConf = new Configuration(conf); - MetastoreConf.setVar(newConf, MetastoreConf.ConfVars.CATALOGS_TO_CACHE, CAT1_NAME + "," + CAT2_NAME); - cachedStore = new CachedStore(); - cachedStore.setConf(newConf); - CachedStore.stopCacheUpdateService(1); - objectStore.setConf(newConf); // have to override it with the new conf since this is where - // prewarm gets the conf object - cachedStore.resetCatalogCache(); - - CachedStore.prewarm(objectStore); - - // All the catalogs should be cached - List cachedCatalogs = cachedStore.getCatalogs(); - Assert.assertEquals(2, cachedCatalogs.size()); - cachedCatalogs.sort(Comparator.naturalOrder()); - Assert.assertEquals(CAT1_NAME, cachedCatalogs.get(0)); - Assert.assertEquals(CAT2_NAME, cachedCatalogs.get(1)); - } -}