diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 05c2acd0ff..128854e5c7 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1149,7 +1149,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal // materialized views HIVE_MATERIALIZED_VIEW_ENABLE_AUTO_REWRITING("hive.materializedview.rewriting", false, "Whether to try to rewrite queries using the materialized views enabled for rewriting"), - HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW("hive.materializedview.rewriting.time.window", 0, + HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW("hive.materializedview.rewriting.time.window", "0s", new TimeValidator(TimeUnit.SECONDS), "Time window, specified in seconds, after which outdated materialized views become invalid for automatic query rewriting.\n" + "For instance, if a materialized view is created and afterwards one of its source tables is changed at " + "moment in time t0, the materialized view will not be considered for rewriting anymore after t0 plus " + @@ -1160,6 +1160,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Default file format for CREATE MATERIALIZED VIEW statement"), HIVE_MATERIALIZED_VIEW_SERDE("hive.materializedview.serde", "org.apache.hadoop.hive.ql.io.orc.OrcSerde", "Default SerDe used for materialized views"), + HIVE_MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY("hive.metastore.materializations.invalidation.clean.frequency", + "3600s", new TimeValidator(TimeUnit.SECONDS), "Frequency at which timer task runs to remove unnecessary transactions information from" + + "materializations invalidation cache."), + HIVE_MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION("hive.metastore.materializations.invalidation.max.duration", + "86400s", new TimeValidator(TimeUnit.SECONDS), "Maximum duration for query producing a materialization. After this time, transactions" + + "information that is not relevant for materializations can be removed from invalidation cache."), // hive.mapjoin.bucket.cache.size has been replaced by hive.smbjoin.cache.row, // need to remove by hive .13. Also, do not change default (see SMB operator) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 23983d85b3..c914ca71d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -49,6 +49,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -1553,7 +1554,8 @@ public Table apply(org.apache.hadoop.hive.metastore.api.Table table) { * @throws HiveException */ public List getValidMaterializedViews() throws HiveException { - final long diff = conf.getIntVar(HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW) * 1000; + final long diff = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW, TimeUnit.MILLISECONDS); final long minTime = System.currentTimeMillis() - diff; try { // Final result diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java new file mode 100644 index 0000000000..e8037cf960 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class MaterializationsCacheCleanerTask implements MetastoreTaskThread { + private static final Logger LOG = LoggerFactory.getLogger(MaterializationsCacheCleanerTask.class); + + private Configuration conf; + + @Override + public long runFrequency(TimeUnit unit) { + return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY, unit); + } + + @Override + public void setConf(Configuration configuration) { + conf = configuration; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void run() { + long removedCnt = MaterializationsInvalidationCache.get().cleanup(System.currentTimeMillis() - + MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION, TimeUnit.MILLISECONDS)); + if (removedCnt > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Number of transaction entries deleted from materializations cache: " + removedCnt); + } + } + } +} diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java index de912d5630..2178816852 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.metastore; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -26,6 +27,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; import org.apache.hadoop.hive.metastore.api.BasicTxnInfo; import org.apache.hadoop.hive.metastore.api.Materialization; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -367,4 +370,54 @@ public String toString() { ALTER } + /** + * Removes transaction events that are not relevant anymore. + * @param minTime events generated before this time (ms) can be deleted from the cache + * @return number of events that were deleted from the cache + */ + public long cleanup(long minTime) { + if (!initialized) { + // Bail out + return 0L; + } + final Multimap keepTxnInfos = HashMultimap.create(); + for (Map.Entry> e : materializations.entrySet()) { + for (MaterializationInvalidationInfo m : e.getValue().values()) { + for (String t : m.getTablesUsed()) { + keepTxnInfos.put(t, m.getMaterializationTable().getCreationMetadata().get(t).getId()); + } + } + } + long removed = 0L; + for (Map.Entry> e : tableModifications.entrySet()) { + for (Iterator it = e.getValue().iterator(); it.hasNext();) { + TableModificationKey v = it.next(); + // To remove, mv should meet two conditions: + // 1) Current time - time of transaction > config parameter, and + // 2) Transaction should not be associated with a MV and should not follow transaction + // associated with MV + if (v.time > minTime) { + // We can stop iterating, as we do not remove the transactions after minTime + break; + } + boolean remove = true; + while (keepTxnInfos.get(e.getKey()).contains(v.id)) { + // We skip this entry as we do not need to remove it, + // we move to next entry + remove = false; + if (!it.hasNext()) { + break; + } + v = it.next(); + } + if (remove) { + // We can remove this entry safely as it met both conditions mentioned above + it.remove(); + removed++; + } + } + } + return removed; + } + } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 3c8d005e3c..b22352dab6 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.DefaultStorageSchemaReader; import org.apache.hadoop.hive.metastore.HiveAlterHandler; +import org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.events.EventCleanerTask; import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager; @@ -539,6 +540,14 @@ public static ConfVars getMetaConf(String name) { "javax.jdo.PersistenceManagerFactoryClass", "org.datanucleus.api.jdo.JDOPersistenceManagerFactory", "class implementing the jdo persistence"), + MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY("metastore.materializations.invalidation.clean.frequency", + "hive.metastore.materializations.invalidation.clean.frequency", + 3600, TimeUnit.SECONDS, "Frequency at which timer task runs to remove unnecessary transaction entries from" + + "materializations invalidation cache."), + MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION("metastore.materializations.invalidation.max.duration", + "hive.metastore.materializations.invalidation.max.duration", + 86400, TimeUnit.SECONDS, "Maximum duration for query producing a materialization. After this time, transaction" + + "entries that are not relevant for materializations can be removed from invalidation cache."), // Parameters for exporting metadata on table drop (requires the use of the) // org.apache.hadoop.hive.ql.parse.MetaDataExportListener preevent listener METADATA_EXPORT_LOCATION("metastore.metadata.export.location", "hive.metadata.export.location", @@ -694,7 +703,9 @@ public static ConfVars getMetaConf(String name) { + "The only supported special character right now is '/'. This flag applies only to quoted table names.\n" + "The default value is true."), TASK_THREADS_ALWAYS("metastore.task.threads.always", "metastore.task.threads.always", - EventCleanerTask.class.getName() + "," + "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask", + EventCleanerTask.class.getName() + "," + + "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," + + MaterializationsCacheCleanerTask.class.getName(), "Comma separated list of tasks that will be started in separate threads. These will " + "always be started, regardless of whether the metastore is running in embedded mode " + "or in server mode. They must implement " + MetastoreTaskThread.class.getName()), diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java new file mode 100644 index 0000000000..6dd93c6ad5 --- /dev/null +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.BasicTxnInfo; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockLevel; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.Materialization; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.iq80.leveldb.DB; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import static junit.framework.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link org.apache.hadoop.hive.metastore.MaterializationsInvalidationCache}. + * The tests focus on arrival of notifications (possibly out of order) and the logic + * to clean up the materializations cache. + */ +public class TestMetaStoreMaterializationsCacheCleaner { + + private static final String DB_NAME = "hive3252"; + private static final String TBL_NAME_1 = "tmptbl1"; + private static final String TBL_NAME_2 = "tmptbl2"; + private static final String TBL_NAME_3 = "tmptbl3"; + private static final String MV_NAME_1 = "mv1"; + private static final String MV_NAME_2 = "mv2"; + + + private RawStore rawStore; + private TxnStore txnStore; + + + @Before + public void setUp() throws Exception { + // create mock raw store + rawStore = mock(RawStore.class); + when(rawStore.getAllDatabases()).thenReturn(ImmutableList.of()); + // create mock txn store + txnStore = mock(TxnStore.class); + } + + @Test + public void testCleanerScenario1() throws Exception { + // initialize invalidation cache + MaterializationsInvalidationCache.get().init(rawStore, txnStore); + + // Scenario consists of the following steps: + // Create tbl1 + // (t = 1) Insert row in tbl1 + // (t = 2) Insert row in tbl1 + // Create tbl2 + // (t = 3) Insert row in tbl2 + // Cleanup (current = 4, duration = 4) -> Does nothing + // Create mv1 + // (t = 10) Insert row in tbl2 + // (t = 9) Insert row in tbl1 (out of order) + // Cleanup (current = 12, duration = 4) -> Removes txn1 + // Create mv2 + // Create tbl3 + // (t = 11) Insert row in tbl3 + // (t = 18) Insert row in tbl3 + // (t = 14) Insert row in tbl1 + // (t = 17) Insert row in tbl1 + // (t = 16) Insert row in tbl2 + // Cleanup (current = 20, duration = 4) -> Removes txn11 + // (t = 12) Insert row in tbl1 + // (t = 15) Insert row in tbl2 + // (t = 7) Insert row in tbl2 + // Cleanup (current = 24, duration = 4) -> Removes txn14, txn16, txn17, txn18 + // Create tbl1 (nothing to do) + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, 1, 1); + int id = 2; + BasicTxnInfo txn2 = createTxnInfo(DB_NAME, TBL_NAME_1, id); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, id, id); + // Create tbl2 (nothing to do) + id = 3; + BasicTxnInfo txn3 = createTxnInfo(DB_NAME, TBL_NAME_1, id); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_2, id, id); + // Cleanup (current = 4, duration = 4) -> Does nothing + long removed = MaterializationsInvalidationCache.get().cleanup(0L); + Assert.assertEquals(0L, removed); + // Create mv1 + Table mv1 = mock(Table.class); + when(mv1.getDbName()).thenReturn(DB_NAME); + when(mv1.getTableName()).thenReturn(MV_NAME_1); + when(mv1.getCreationMetadata()).thenReturn( + ImmutableMap.of( + DB_NAME + "." + TBL_NAME_1, txn2, + DB_NAME + "." + TBL_NAME_2, txn3)); + MaterializationsInvalidationCache.get().createMaterializedView( + mv1, ImmutableSet.of(DB_NAME + "." + TBL_NAME_1, DB_NAME + "." + TBL_NAME_2)); + Map invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1)); + Assert.assertEquals(0L, invalidationInfos.get(MV_NAME_1).getInvalidationTime()); + id = 10; + BasicTxnInfo txn10 = createTxnInfo(DB_NAME, TBL_NAME_2, id); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_2, id, id); + id = 9; + BasicTxnInfo txn9 = createTxnInfo(DB_NAME, TBL_NAME_1, id); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, id, id); + // Cleanup (current = 12, duration = 4) -> Removes txn1 + removed = MaterializationsInvalidationCache.get().cleanup(8L); + Assert.assertEquals(1L, removed); + invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1)); + Assert.assertEquals(9L, invalidationInfos.get(MV_NAME_1).getInvalidationTime()); + // Create mv2 + Table mv2 = mock(Table.class); + when(mv2.getDbName()).thenReturn(DB_NAME); + when(mv2.getTableName()).thenReturn(MV_NAME_2); + when(mv2.getCreationMetadata()).thenReturn( + ImmutableMap.of( + DB_NAME + "." + TBL_NAME_1, txn9, + DB_NAME + "." + TBL_NAME_2, txn10)); + MaterializationsInvalidationCache.get().createMaterializedView( + mv2, ImmutableSet.of(DB_NAME + "." + TBL_NAME_1, DB_NAME + "." + TBL_NAME_2)); + invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2)); + Assert.assertEquals(9L, invalidationInfos.get(MV_NAME_1).getInvalidationTime()); + Assert.assertEquals(0L, invalidationInfos.get(MV_NAME_2).getInvalidationTime()); + // Create tbl3 (nothing to do) + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_3, 11, 11); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_3, 18, 18); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, 14, 14); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, 17, 17); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_2, 16, 16); + // Cleanup (current = 20, duration = 4) -> Removes txn18 + removed = MaterializationsInvalidationCache.get().cleanup(16L); + Assert.assertEquals(1L, removed); + invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2)); + Assert.assertEquals(9L, invalidationInfos.get(MV_NAME_1).getInvalidationTime()); + Assert.assertEquals(14L, invalidationInfos.get(MV_NAME_2).getInvalidationTime()); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_1, 12, 12); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_2, 15, 15); + MaterializationsInvalidationCache.get().notifyTableModification( + DB_NAME, TBL_NAME_2, 7, 7); + // Cleanup (current = 24, duration = 4) -> Removes txn14, txn16, txn17, txn18 + removed = MaterializationsInvalidationCache.get().cleanup(20L); + Assert.assertEquals(4L, removed); + invalidationInfos = + MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( + DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2)); + Assert.assertEquals(7L, invalidationInfos.get(MV_NAME_1).getInvalidationTime()); + Assert.assertEquals(12L, invalidationInfos.get(MV_NAME_2).getInvalidationTime()); + } + + private static BasicTxnInfo createTxnInfo(String dbName, String tableName, int i) { + BasicTxnInfo r = new BasicTxnInfo(); + r.setDbname(dbName); + r.setTablename(tableName); + r.setId(i); + r.setTime(i); + return r; + } +}