diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 38f6430bc3..169ddcb1a6 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1161,7 +1161,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal // materialized views HIVE_MATERIALIZED_VIEW_ENABLE_AUTO_REWRITING("hive.materializedview.rewriting", false, "Whether to try to rewrite queries using the materialized views enabled for rewriting"), - HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW("hive.materializedview.rewriting.time.window", 0, + HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW("hive.materializedview.rewriting.time.window", "0s", new TimeValidator(TimeUnit.SECONDS), "Time window, specified in seconds, after which outdated materialized views become invalid for automatic query rewriting.\n" + "For instance, if a materialized view is created and afterwards one of its source tables is changed at " + "moment in time t0, the materialized view will not be considered for rewriting anymore after t0 plus " + @@ -1172,6 +1172,17 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Default file format for CREATE MATERIALIZED VIEW statement"), HIVE_MATERIALIZED_VIEW_SERDE("hive.materializedview.serde", "org.apache.hadoop.hive.ql.io.orc.OrcSerde", "Default SerDe used for materialized views"), + HIVE_MATERIALIZATIONS_INVALIDATION_CACHE_IMPL("hive.metastore.materializations.invalidation.impl", "DEFAULT", + new StringSet("DEFAULT", "DISABLE"), + "The implementation that we should use for the materializations invalidation cache. \n" + + " DEFAULT: Default implementation for invalidation cache\n" + + " DISABLE: Disable invalidation cache (debugging purposes)"), + HIVE_MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY("hive.metastore.materializations.invalidation.clean.frequency", + "3600s", new TimeValidator(TimeUnit.SECONDS), "Frequency at which timer task runs to remove unnecessary transactions information from" + + "materializations invalidation cache."), + HIVE_MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION("hive.metastore.materializations.invalidation.max.duration", + "86400s", new TimeValidator(TimeUnit.SECONDS), "Maximum duration for query producing a materialization. After this time, transactions" + + "information that is not relevant for materializations can be removed from invalidation cache."), // hive.mapjoin.bucket.cache.size has been replaced by hive.smbjoin.cache.row, // need to remove by hive .13. Also, do not change default (see SMB operator) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 7b7e14071e..6aa61e05ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -44,6 +44,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.Set; import java.util.stream.Collectors; @@ -1293,7 +1294,8 @@ public Table apply(org.apache.hadoop.hive.metastore.api.Table table) { * @throws HiveException */ public List getValidMaterializedViews() throws HiveException { - final long diff = conf.getIntVar(HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW) * 1000; + final long diff = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW, TimeUnit.MILLISECONDS); final long minTime = System.currentTimeMillis() - diff; try { // Final result diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java new file mode 100644 index 0000000000..e8037cf960 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class MaterializationsCacheCleanerTask implements MetastoreTaskThread { + private static final Logger LOG = LoggerFactory.getLogger(MaterializationsCacheCleanerTask.class); + + private Configuration conf; + + @Override + public long runFrequency(TimeUnit unit) { + return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY, unit); + } + + @Override + public void setConf(Configuration configuration) { + conf = configuration; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void run() { + long removedCnt = MaterializationsInvalidationCache.get().cleanup(System.currentTimeMillis() - + MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION, TimeUnit.MILLISECONDS)); + if (removedCnt > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Number of transaction entries deleted from materializations cache: " + removedCnt); + } + } + } +} diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java index 92653ae97e..6e54eb9f38 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.hive.metastore; +import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -28,10 +31,13 @@ import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; import org.apache.hadoop.hive.metastore.api.BasicTxnInfo; import org.apache.hadoop.hive.metastore.api.Materialization; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +60,9 @@ /* Singleton */ private static final MaterializationsInvalidationCache SINGLETON = new MaterializationsInvalidationCache(); + /* If this boolean is true, this class has no functionality. Only for debugging purposes. */ + private boolean disable; + /* Key is the database name. Each value is a map from the unique view qualified name to * the materialization invalidation info. This invalidation object contains information * such as the tables used by the materialized view or the invalidation time, i.e., first @@ -62,10 +71,10 @@ new ConcurrentHashMap>(); /* - * Key is a qualified table name. The value is a (sorted) tree set (supporting concurrent - * modifications) that will keep the modifications for a given table in the order that they - * happen. This is useful to quickly check the invalidation time for a given materialized - * view. + * Key is a qualified table name. The value is a (sorted) tree map (supporting concurrent + * modifications) that will keep the modifications for a given table in the order of their + * transaction id. This is useful to quickly check the invalidation time for a given + * materialization. */ private final ConcurrentMap> tableModifications = new ConcurrentHashMap>(); @@ -100,6 +109,14 @@ public synchronized void init(final RawStore store, final TxnStore txnStore) { this.store = store; this.txnStore = txnStore; + // This will only be true for debugging purposes + this.disable = MetastoreConf.getVar(store.getConf(), + MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_IMPL).equals("DISABLE"); + if (disable) { + // Nothing to do + return; + } + if (!initialized) { this.initialized = true; ExecutorService pool = Executors.newCachedThreadPool(); @@ -162,6 +179,10 @@ public void alterMaterializedView(String dbName, String tableName, Set t */ private void addMaterializedView(String dbName, String tableName, Set tablesUsed, String validTxnList, OpType opType) { + if (disable) { + // Nothing to do + return; + } // We are going to create the map for each view in the given database ConcurrentMap cq = new ConcurrentHashMap(); @@ -225,6 +246,10 @@ private void addMaterializedView(String dbName, String tableName, Set ta */ public void notifyTableModification(String dbName, String tableName, long txnId, long newModificationTime) { + if (disable) { + // Nothing to do + return; + } if (LOG.isDebugEnabled()) { LOG.debug("Notification for table {} in database {} received -> id: {}, time: {}", tableName, dbName, txnId, newModificationTime); @@ -246,6 +271,10 @@ public void notifyTableModification(String dbName, String tableName, * @param tableName */ public void dropMaterializedView(String dbName, String tableName) { + if (disable) { + // Nothing to do + return; + } materializations.get(dbName).remove(tableName); } @@ -308,12 +337,12 @@ private long getInvalidationTime(MaterializationInvalidationInfo materialization ValidTxnList txnList = new ValidReadTxnList(txnListString); long firstModificationTimeAfterCreation = 0L; for (String qNameTableUsed : materialization.getTablesUsed()) { - final Long tn = tableModifications.get(qNameTableUsed) - .higherKey(txnList.getHighWatermark()); + final Entry tn = tableModifications.get(qNameTableUsed) + .higherEntry(txnList.getHighWatermark()); if (tn != null) { if (firstModificationTimeAfterCreation == 0L || - tn < firstModificationTimeAfterCreation) { - firstModificationTimeAfterCreation = tn; + tn.getValue() < firstModificationTimeAfterCreation) { + firstModificationTimeAfterCreation = tn.getValue(); } } // Min open txn might be null if there were no open transactions @@ -346,4 +375,105 @@ private long getInvalidationTime(MaterializationInvalidationInfo materialization ALTER } + /** + * Removes transaction events that are not relevant anymore. + * @param minTime events generated before this time (ms) can be deleted from the cache + * @return number of events that were deleted from the cache + */ + public long cleanup(long minTime) { + // To remove, mv should meet two conditions: + // 1) Current time - time of transaction > config parameter, and + // 2) Transaction should not be associated with invalidation of a MV + if (disable || !initialized) { + // Bail out + return 0L; + } + // We execute the cleanup in two steps + // First we gather all the transactions that need to be kept + final Multimap keepTxnInfos = HashMultimap.create(); + for (Map.Entry> e : materializations.entrySet()) { + for (MaterializationInvalidationInfo m : e.getValue().values()) { + ValidTxnList txnList = new ValidReadTxnList(m.getValidTxnList()); + boolean canBeDeleted = false; + String currentTableForInvalidatingTxn = null; + long currentInvalidatingTxnId = 0L; + long currentInvalidatingTxnTime = 0L; + for (String qNameTableUsed : m.getTablesUsed()) { + final Entry tn = tableModifications.get(qNameTableUsed) + .higherEntry(txnList.getHighWatermark()); + if (tn != null) { + if (currentInvalidatingTxnTime == 0L || + tn.getValue() < currentInvalidatingTxnTime) { + // This transaction 1) is the first one examined for this materialization, or + // 2) it is the invalidating transaction. Hence we add it to the transactions to keep. + // 1.- We remove the previous invalidating transaction from the transactions + // to be kept (if needed). + if (canBeDeleted && currentInvalidatingTxnTime < minTime) { + keepTxnInfos.remove(currentTableForInvalidatingTxn, currentInvalidatingTxnId); + } + // 2.- We add this transaction to the transactions that should be kept. + canBeDeleted = !keepTxnInfos.get(qNameTableUsed).contains(tn.getKey()); + keepTxnInfos.put(qNameTableUsed, tn.getKey()); + // 3.- We record this transaction as the current invalidating transaction. + currentTableForInvalidatingTxn = qNameTableUsed; + currentInvalidatingTxnId = tn.getKey(); + currentInvalidatingTxnTime = tn.getValue(); + } + } + if (txnList.getMinOpenTxn() != null) { + // Invalid transaction list is sorted + int pos = 0; + for (Entry t : tableModifications.get(qNameTableUsed) + .subMap(txnList.getMinOpenTxn(), txnList.getHighWatermark()).entrySet()) { + while (pos < txnList.getInvalidTransactions().length && + txnList.getInvalidTransactions()[pos] != t.getKey()) { + pos++; + } + if (pos >= txnList.getInvalidTransactions().length) { + break; + } + if (currentInvalidatingTxnTime == 0L || + t.getValue() < currentInvalidatingTxnTime) { + // This transaction 1) is the first one examined for this materialization, or + // 2) it is the invalidating transaction. Hence we add it to the transactions to keep. + // 1.- We remove the previous invalidating transaction from the transactions + // to be kept (if needed). + if (canBeDeleted && currentInvalidatingTxnTime < minTime) { + keepTxnInfos.remove(currentTableForInvalidatingTxn, currentInvalidatingTxnId); + } + // 2.- We add this transaction to the transactions that should be kept. + canBeDeleted = !keepTxnInfos.get(qNameTableUsed).contains(t.getKey()); + keepTxnInfos.put(qNameTableUsed, t.getKey()); + // 3.- We record this transaction as the current invalidating transaction. + currentTableForInvalidatingTxn = qNameTableUsed; + currentInvalidatingTxnId = t.getKey(); + currentInvalidatingTxnTime = t.getValue(); + } + } + } + } + } + } + // Second, we remove the transactions + long removed = 0L; + for (Entry> e : tableModifications.entrySet()) { + Collection c = keepTxnInfos.get(e.getKey()); + for (Iterator> it = e.getValue().entrySet().iterator(); it.hasNext();) { + Entry v = it.next(); + // We need to check again the time because some of the transactions might not be explored + // above, e.g., transactions above the highest transaction mark for all the materialized + // views. + if (v.getValue() < minTime && (c.isEmpty() || !c.contains(v.getKey()))) { + if (LOG.isDebugEnabled()) { + LOG.debug("Transaction removed from cache for table {} -> id: {}, time: {}", + e.getKey(), v.getKey(), v.getValue()); + } + it.remove(); + removed++; + } + } + } + return removed; + } + } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 699a64948f..9f822564bd 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.DefaultStorageSchemaReader; import org.apache.hadoop.hive.metastore.HiveAlterHandler; +import org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.events.EventCleanerTask; import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager; @@ -553,6 +554,20 @@ public static ConfVars getMetaConf(String name) { "javax.jdo.PersistenceManagerFactoryClass", "org.datanucleus.api.jdo.JDOPersistenceManagerFactory", "class implementing the jdo persistence"), + MATERIALIZATIONS_INVALIDATION_CACHE_IMPL("metastore.materializations.invalidation.impl", + "hive.metastore.materializations.invalidation.impl", "DEFAULT", + new Validator.StringSet("DEFAULT", "DISABLE"), + "The implementation that we should use for the materializations invalidation cache. \n" + + " DEFAULT: Default implementation for invalidation cache\n" + + " DISABLE: Disable invalidation cache (debugging purposes)"), + MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY("metastore.materializations.invalidation.clean.frequency", + "hive.metastore.materializations.invalidation.clean.frequency", + 3600, TimeUnit.SECONDS, "Frequency at which timer task runs to remove unnecessary transaction entries from" + + "materializations invalidation cache."), + MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION("metastore.materializations.invalidation.max.duration", + "hive.metastore.materializations.invalidation.max.duration", + 86400, TimeUnit.SECONDS, "Maximum duration for query producing a materialization. After this time, transaction" + + "entries that are not relevant for materializations can be removed from invalidation cache."), // Parameters for exporting metadata on table drop (requires the use of the) // org.apache.hadoop.hive.ql.parse.MetaDataExportListener preevent listener METADATA_EXPORT_LOCATION("metastore.metadata.export.location", "hive.metadata.export.location", @@ -708,7 +723,9 @@ public static ConfVars getMetaConf(String name) { + "The only supported special character right now is '/'. This flag applies only to quoted table names.\n" + "The default value is true."), TASK_THREADS_ALWAYS("metastore.task.threads.always", "metastore.task.threads.always", - EventCleanerTask.class.getName() + "," + "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask", + EventCleanerTask.class.getName() + "," + + "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," + + MaterializationsCacheCleanerTask.class.getName(), "Comma separated list of tasks that will be started in separate threads. These will " + "always be started, regardless of whether the metastore is running in embedded mode " + "or in server mode. They must implement " + MetastoreTaskThread.class.getName()), diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java new file mode 100644 index 0000000000..8f8bff6d93 --- /dev/null +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.BasicTxnInfo; +import org.apache.hadoop.hive.metastore.api.CreationMetadata; +import org.apache.hadoop.hive.metastore.api.Materialization; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.junit.Assert; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link org.apache.hadoop.hive.metastore.MaterializationsInvalidationCache}. + * The tests focus on arrival of notifications (possibly out of order) and the logic + * to clean up the materializations cache. Tests need to be executed in a certain order + * to avoid interactions among them, as the invalidation cache is a singleton. + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class TestMetaStoreMaterializationsCacheCleaner { + + private static final String DB_NAME = "hive3252"; + private static final String TBL_NAME_1 = "tmptbl1"; + private static final String TBL_NAME_2 = "tmptbl2"; + private static final String TBL_NAME_3 = "tmptbl3"; + private static final String MV_NAME_1 = "mv1"; + private static final String MV_NAME_2 = "mv2"; + + + @Test + public void testCleanerScenario1() throws Exception { + // create mock raw store + final RawStore rawStore = mock(RawStore.class); + when(rawStore.getAllDatabases()).thenReturn(ImmutableList.of()); + Configuration conf = new Configuration(); + conf.set("metastore.materializations.invalidation.impl", "DISABLE"); + when(rawStore.getConf()).thenReturn(conf); + // create mock txn store + final TxnStore txnStore = mock(TxnStore.class); + // initialize invalidation cache (set conf to disable) + MaterializationsInvalidationCache.get().init(rawStore, txnStore); + + // This is a dummy test, invalidation cache is not supposed to + // record any information. + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, 1, 1); + int id = 2; + BasicTxnInfo txn2 = createTxnInfo(DB_NAME, TBL_NAME_1, id); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, id, id); + // Create tbl2 (nothing to do) + id = 3; + BasicTxnInfo txn3 = createTxnInfo(DB_NAME, TBL_NAME_1, id); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_2, id, id); + // Cleanup (current = 4, duration = 4) -> Does nothing + long removed = MaterializationsInvalidationCache.get().cleanup(0L); + Assert.assertEquals(0L, removed); + // Create mv1 + Table mv1 = mock(Table.class); + when(mv1.getDbName()).thenReturn(DB_NAME); + when(mv1.getTableName()).thenReturn(MV_NAME_1); + CreationMetadata mockCM1 = new CreationMetadata( + DB_NAME, MV_NAME_1, + ImmutableSet.of( + DB_NAME + "." + TBL_NAME_1, + DB_NAME + "." + TBL_NAME_2)); + // Create txn list (highWatermark=4;minOpenTxn=Long.MAX_VALUE) + mockCM1.setValidTxnList("3:" + Long.MAX_VALUE + "::"); + when(mv1.getCreationMetadata()).thenReturn(mockCM1); + MaterializationsInvalidationCache.get().createMaterializedView(mockCM1.getDbName(), mockCM1.getTblName(), + mockCM1.getTablesUsed(), mockCM1.getValidTxnList()); + Map invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1)); + Assert.assertTrue(invalidationInfos.isEmpty()); + id = 10; + BasicTxnInfo txn10 = createTxnInfo(DB_NAME, TBL_NAME_2, id); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_2, id, id); + id = 9; + BasicTxnInfo txn9 = createTxnInfo(DB_NAME, TBL_NAME_1, id); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, id, id); + // Cleanup (current = 12, duration = 4) -> Removes txn1, txn2, txn3 + removed = MaterializationsInvalidationCache.get().cleanup(8L); + Assert.assertEquals(0L, removed); + invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1)); + Assert.assertTrue(invalidationInfos.isEmpty()); + // Create mv2 + Table mv2 = mock(Table.class); + when(mv2.getDbName()).thenReturn(DB_NAME); + when(mv2.getTableName()).thenReturn(MV_NAME_2); + CreationMetadata mockCM2 = new CreationMetadata( + DB_NAME, MV_NAME_2, + ImmutableSet.of( + DB_NAME + "." + TBL_NAME_1, + DB_NAME + "." + TBL_NAME_2)); + // Create txn list (highWatermark=10;minOpenTxn=Long.MAX_VALUE) + mockCM2.setValidTxnList("10:" + Long.MAX_VALUE + "::"); + when(mv2.getCreationMetadata()).thenReturn(mockCM2); + MaterializationsInvalidationCache.get().createMaterializedView(mockCM2.getDbName(), mockCM2.getTblName(), + mockCM2.getTablesUsed(), mockCM2.getValidTxnList()); + when(mv2.getCreationMetadata()).thenReturn(mockCM2); + invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2)); + Assert.assertTrue(invalidationInfos.isEmpty()); + // Create tbl3 (nothing to do) + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_3, 11, 11); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_3, 18, 18); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, 14, 14); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, 17, 17); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_2, 16, 16); + // Cleanup (current = 20, duration = 4) -> Removes txn10, txn11 + removed = MaterializationsInvalidationCache.get().cleanup(16L); + Assert.assertEquals(0L, removed); + invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2)); + Assert.assertTrue(invalidationInfos.isEmpty()); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, 12, 12); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_2, 15, 15); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_2, 7, 7); + invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2)); + Assert.assertTrue(invalidationInfos.isEmpty()); + // Cleanup (current = 24, duration = 4) -> Removes txn9, txn14, txn15, txn16, txn17, txn18 + removed = MaterializationsInvalidationCache.get().cleanup(20L); + Assert.assertEquals(0L, removed); + invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2)); + Assert.assertTrue(invalidationInfos.isEmpty()); + // Cleanup (current = 28, duration = 4) -> Removes txn9 + removed = MaterializationsInvalidationCache.get().cleanup(24L); + Assert.assertEquals(0L, removed); + } + + @Test + public void testCleanerScenario2() throws Exception { + // create mock raw store + final RawStore rawStore = mock(RawStore.class); + when(rawStore.getAllDatabases()).thenReturn(ImmutableList.of()); + Configuration conf = new Configuration(); + conf.set("metastore.materializations.invalidation.impl", "DEFAULT"); + when(rawStore.getConf()).thenReturn(conf); + // create mock txn store + final TxnStore txnStore = mock(TxnStore.class); + // initialize invalidation cache (set conf to default) + MaterializationsInvalidationCache.get().init(rawStore, txnStore); + + // Scenario consists of the following steps: + // Create tbl1 + // (t = 1) Insert row in tbl1 + // (t = 2) Insert row in tbl1 + // Create tbl2 + // (t = 3) Insert row in tbl2 + // Cleanup (current = 4, duration = 4) -> Does nothing + // Create mv1 + // (t = 10) Insert row in tbl2 + // (t = 9) Insert row in tbl1 (out of order) + // Cleanup (current = 12, duration = 4) -> Removes txn1, txn2, txn3 + // Create mv2 + // Create tbl3 + // (t = 11) Insert row in tbl3 + // (t = 18) Insert row in tbl3 + // (t = 14) Insert row in tbl1 + // (t = 17) Insert row in tbl1 + // (t = 16) Insert row in tbl2 + // Cleanup (current = 20, duration = 4) -> Removes txn10, txn11 + // (t = 12) Insert row in tbl1 + // (t = 15) Insert row in tbl2 + // (t = 7) Insert row in tbl2 + // Cleanup (current = 24, duration = 4) -> Removes txn9, txn14, txn15, txn16, txn17, txn18 + // Create tbl1 (nothing to do) + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, 1, 1); + int id = 2; + BasicTxnInfo txn2 = createTxnInfo(DB_NAME, TBL_NAME_1, id); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, id, id); + // Create tbl2 (nothing to do) + id = 3; + BasicTxnInfo txn3 = createTxnInfo(DB_NAME, TBL_NAME_1, id); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_2, id, id); + // Cleanup (current = 4, duration = 4) -> Does nothing + long removed = MaterializationsInvalidationCache.get().cleanup(0L); + Assert.assertEquals(0L, removed); + // Create mv1 + Table mv1 = mock(Table.class); + when(mv1.getDbName()).thenReturn(DB_NAME); + when(mv1.getTableName()).thenReturn(MV_NAME_1); + CreationMetadata mockCM1 = new CreationMetadata( + DB_NAME, MV_NAME_1, + ImmutableSet.of( + DB_NAME + "." + TBL_NAME_1, + DB_NAME + "." + TBL_NAME_2)); + // Create txn list (highWatermark=4;minOpenTxn=Long.MAX_VALUE) + mockCM1.setValidTxnList("3:" + Long.MAX_VALUE + "::"); + when(mv1.getCreationMetadata()).thenReturn(mockCM1); + MaterializationsInvalidationCache.get().createMaterializedView(mockCM1.getDbName(), mockCM1.getTblName(), + mockCM1.getTablesUsed(), mockCM1.getValidTxnList()); + Map invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1)); + Assert.assertEquals(0L, invalidationInfos.get(MV_NAME_1).getInvalidationTime()); + id = 10; + BasicTxnInfo txn10 = createTxnInfo(DB_NAME, TBL_NAME_2, id); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_2, id, id); + id = 9; + BasicTxnInfo txn9 = createTxnInfo(DB_NAME, TBL_NAME_1, id); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, id, id); + // Cleanup (current = 12, duration = 4) -> Removes txn1, txn2, txn3 + removed = MaterializationsInvalidationCache.get().cleanup(8L); + Assert.assertEquals(3L, removed); + invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1)); + Assert.assertEquals(9L, invalidationInfos.get(MV_NAME_1).getInvalidationTime()); + // Create mv2 + Table mv2 = mock(Table.class); + when(mv2.getDbName()).thenReturn(DB_NAME); + when(mv2.getTableName()).thenReturn(MV_NAME_2); + CreationMetadata mockCM2 = new CreationMetadata( + DB_NAME, MV_NAME_2, + ImmutableSet.of( + DB_NAME + "." + TBL_NAME_1, + DB_NAME + "." + TBL_NAME_2)); + // Create txn list (highWatermark=10;minOpenTxn=Long.MAX_VALUE) + mockCM2.setValidTxnList("10:" + Long.MAX_VALUE + "::"); + when(mv2.getCreationMetadata()).thenReturn(mockCM2); + MaterializationsInvalidationCache.get().createMaterializedView(mockCM2.getDbName(), mockCM2.getTblName(), + mockCM2.getTablesUsed(), mockCM2.getValidTxnList()); + when(mv2.getCreationMetadata()).thenReturn(mockCM2); + invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2)); + Assert.assertEquals(9L, invalidationInfos.get(MV_NAME_1).getInvalidationTime()); + Assert.assertEquals(0L, invalidationInfos.get(MV_NAME_2).getInvalidationTime()); + // Create tbl3 (nothing to do) + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_3, 11, 11); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_3, 18, 18); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, 14, 14); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, 17, 17); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_2, 16, 16); + // Cleanup (current = 20, duration = 4) -> Removes txn10, txn11 + removed = MaterializationsInvalidationCache.get().cleanup(16L); + Assert.assertEquals(2L, removed); + invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2)); + Assert.assertEquals(9L, invalidationInfos.get(MV_NAME_1).getInvalidationTime()); + Assert.assertEquals(14L, invalidationInfos.get(MV_NAME_2).getInvalidationTime()); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, 12, 12); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_2, 15, 15); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_2, 7, 7); + invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2)); + Assert.assertEquals(7L, invalidationInfos.get(MV_NAME_1).getInvalidationTime()); + Assert.assertEquals(12L, invalidationInfos.get(MV_NAME_2).getInvalidationTime()); + // Cleanup (current = 24, duration = 4) -> Removes txn9, txn14, txn15, txn16, txn17, txn18 + removed = MaterializationsInvalidationCache.get().cleanup(20L); + Assert.assertEquals(6L, removed); + invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2)); + Assert.assertEquals(7L, invalidationInfos.get(MV_NAME_1).getInvalidationTime()); + Assert.assertEquals(12L, invalidationInfos.get(MV_NAME_2).getInvalidationTime()); + // Cleanup (current = 28, duration = 4) -> Removes txn9 + removed = MaterializationsInvalidationCache.get().cleanup(24L); + Assert.assertEquals(0L, removed); + } + + private static BasicTxnInfo createTxnInfo(String dbName, String tableName, int i) { + BasicTxnInfo r = new BasicTxnInfo(); + r.setDbname(dbName); + r.setTablename(tableName); + r.setTxnid(i); + r.setTime(i); + return r; + } +}