diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index afee315378..3b71164171 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3255,6 +3255,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "The implementation that we should use for the materialized views registry. \n" + " DEFAULT: Default cache for materialized views\n" + " DUMMY: Do not cache materialized views and hence forward requests to metastore"), + HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_REFRESH("hive.server2.materializedviews.registry.refresh.period", "60s", + new TimeValidator(TimeUnit.SECONDS), + "Period, specified in seconds, between successive refreshes of the registry to pull new materializations " + + "from the metastore that may have been created by other HS2 instances."), // HiveServer2 WebUI HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"), diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java index a9cb009191..2707987f0b 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java +++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java @@ -43,6 +43,7 @@ public static final String PARSE = "parse"; public static final String ANALYZE = "semanticAnalyze"; public static final String OPTIMIZER = "optimizer"; + public static final String MATERIALIZED_VIEWS_REGISTRY_REFRESH = "MaterializedViewsRegistryRefresh"; public static final String DO_AUTHORIZATION = "doAuthorization"; public static final String DRIVER_EXECUTE = "Driver.execute"; public static final String INPUT_SUMMARY = "getInputSummary"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/DropMaterializedViewOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/DropMaterializedViewOperation.java index de09a55ca4..21b456aa45 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/DropMaterializedViewOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/DropMaterializedViewOperation.java @@ -54,7 +54,7 @@ public int execute() throws HiveException { // TODO: API w/catalog name context.getDb().dropTable(desc.getTableName(), false); - HiveMaterializedViewsRegistry.get().dropMaterializedView(table); + HiveMaterializedViewsRegistry.get().dropMaterializedView(table.getDbName(), table.getTableName()); DDLUtils.addIfAbsentByName(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK), context); return 0; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/MaterializedViewUpdateOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/MaterializedViewUpdateOperation.java index ad6e163a4f..4154cb2c74 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/MaterializedViewUpdateOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/MaterializedViewUpdateOperation.java @@ -60,6 +60,8 @@ public int execute() throws HiveException { ImmutableSet.copyOf(mvTable.getCreationMetadata().getTablesUsed())); cm.setValidTxnList(context.getConf().get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); context.getDb().updateCreationMetadata(mvTable.getDbName(), mvTable.getTableName(), cm); + mvTable.setCreationMetadata(cm); + HiveMaterializedViewsRegistry.get().createMaterializedView(context.getDb().getConf(), mvTable); } } catch (HiveException e) { LOG.debug("Exception during materialized view cache update", e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index db8cc6c99d..6143e85664 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1612,6 +1612,150 @@ public Table apply(org.apache.hadoop.hive.metastore.api.Table table) { } } + /** + * Get the materialized views that have been enabled for rewriting from the + * cache (registry). It will preprocess them to discard those that are + * outdated and augment those that need to be augmented, e.g., if incremental + * rewriting is enabled. + * + * @return the list of materialized views available for rewriting from the registry + * @throws HiveException + */ + public List getPreprocessedMaterializedViewsFromRegistry( + List tablesUsed, HiveTxnManager txnMgr) throws HiveException { + // From cache + List materializedViews = + HiveMaterializedViewsRegistry.get().getRewritingMaterializedViews(); + if (materializedViews.isEmpty()) { + // Bail out: empty list + return new ArrayList<>(); + } + // Add to final result + return filterAugmentMaterializedViews(materializedViews, tablesUsed, txnMgr); + } + + private List filterAugmentMaterializedViews(List materializedViews, + List tablesUsed, HiveTxnManager txnMgr) throws HiveException { + final String validTxnsList = conf.get(ValidTxnList.VALID_TXNS_KEY); + final ValidTxnWriteIdList currentTxnWriteIds = txnMgr.getValidWriteIds(tablesUsed, validTxnsList); + final boolean tryIncrementalRewriting = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_INCREMENTAL); + final long defaultTimeWindow = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW, + TimeUnit.MILLISECONDS); + try { + // Final result + List result = new ArrayList<>(); + for (RelOptMaterialization materialization : materializedViews) { + final RelNode viewScan = materialization.tableRel; + final Table materializedViewTable; + if (viewScan instanceof Project) { + // There is a Project on top (due to nullability) + materializedViewTable = ((RelOptHiveTable) viewScan.getInput(0).getTable()).getHiveTableMD(); + } else { + materializedViewTable = ((RelOptHiveTable) viewScan.getTable()).getHiveTableMD(); + } + final Boolean outdated = isOutdatedMaterializedView(materializedViewTable, currentTxnWriteIds, + defaultTimeWindow, tablesUsed, false); + if (outdated == null) { + continue; + } + + final CreationMetadata creationMetadata = materializedViewTable.getCreationMetadata(); + if (outdated) { + // The MV is outdated, see whether we should consider it for rewriting or not + if (!tryIncrementalRewriting) { + LOG.debug("Materialized view " + materializedViewTable.getFullyQualifiedName() + + " ignored for rewriting as its contents are outdated"); + continue; + } + // We will rewrite it to include the filters on transaction list + // so we can produce partial rewritings. + // This would be costly since we are doing it for every materialized view + // that is outdated, but it only happens for more than one materialized view + // if rewriting with outdated materialized views is enabled (currently + // disabled by default). + materialization = augmentMaterializationWithTimeInformation( + materialization, validTxnsList, new ValidTxnWriteIdList( + creationMetadata.getValidTxnList())); + } + result.add(materialization); + } + return result; + } catch (Exception e) { + throw new HiveException(e); + } + } + + /** + * Validate that the materialized views retrieved from registry are still up-to-date. + * For those that are not, the method loads them from the metastore into the registry. + * + * @return true if they are up-to-date, otherwise false + * @throws HiveException + */ + public boolean validateMaterializedViewsFromRegistry(List cachedMaterializedViewTables, + List tablesUsed, HiveTxnManager txnMgr) throws HiveException { + final long defaultTimeWindow = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW, + TimeUnit.MILLISECONDS); + final String validTxnsList = conf.get(ValidTxnList.VALID_TXNS_KEY); + final ValidTxnWriteIdList currentTxnWriteIds = txnMgr.getValidWriteIds(tablesUsed, validTxnsList); + try { + // Final result + boolean result = true; + for (Table cachedMaterializedViewTable : cachedMaterializedViewTables) { + // Retrieve the materialized view table from the metastore + final Table materializedViewTable = getTable( + cachedMaterializedViewTable.getDbName(), cachedMaterializedViewTable.getTableName()); + if (materializedViewTable == null || !materializedViewTable.isRewriteEnabled()) { + // This could happen if materialized view has been deleted or rewriting has been disabled. + // We remove it from the registry and set result to false. + HiveMaterializedViewsRegistry.get().dropMaterializedView(cachedMaterializedViewTable); + result = false; + } else { + final Boolean outdated = isOutdatedMaterializedView(cachedMaterializedViewTable, + currentTxnWriteIds, defaultTimeWindow, tablesUsed, false); + if (outdated == null) { + result = false; + continue; + } + // If the cached materialized view was not outdated wrt the query snapshot, + // then we know that the metastore version should be either the same or + // more recent. If it is more recent, snapshot isolation will shield us + // from the reading its contents after snapshot was acquired, but we will + // update the registry so we have most recent version. + // On the other hand, if the materialized view in the cache was outdated, + // we can only use it if the version that was in the cache is the same one + // that we can find in the metastore. + if (outdated) { + if (!cachedMaterializedViewTable.equals(materializedViewTable)) { + // We ignore and update the registry + HiveMaterializedViewsRegistry.get().refreshMaterializedView(conf, cachedMaterializedViewTable, materializedViewTable); + result = false; + } else { + // Obtain additional information if we should try incremental rewriting / rebuild + // We will not try partial rewriting if there were update/delete operations on source tables + Materialization invalidationInfo = getMSC().getMaterializationInvalidationInfo( + materializedViewTable.getCreationMetadata(), conf.get(ValidTxnList.VALID_TXNS_KEY)); + if (invalidationInfo == null || invalidationInfo.isSourceTablesUpdateDeleteModified()) { + // We ignore (as it did not meet the requirements), but we do not need to update it in the + // registry, since it is up-to-date + result = false; + } + } + } else if (!cachedMaterializedViewTable.equals(materializedViewTable)) { + // Update the registry + HiveMaterializedViewsRegistry.get().refreshMaterializedView(conf, cachedMaterializedViewTable, materializedViewTable); + } + } + } + return result; + } catch (Exception e) { + throw new HiveException(e); + } + } + /** * Get the materialized views that have been enabled for rewriting from the * metastore. If the materialized view is in the cache, we do not need to @@ -1623,33 +1767,42 @@ public Table apply(org.apache.hadoop.hive.metastore.api.Table table) { * @return the list of materialized views available for rewriting * @throws HiveException */ - public List getAllValidMaterializedViews(List tablesUsed, boolean forceMVContentsUpToDate, - HiveTxnManager txnMgr) throws HiveException { - // Final result - List result = new ArrayList<>(); - try { - // From metastore (for security) - List
materializedViews = getAllMaterializedViewObjectsForRewriting(); - if (materializedViews.isEmpty()) { - // Bail out: empty list - return result; - } - result.addAll(getValidMaterializedViews(materializedViews, - tablesUsed, forceMVContentsUpToDate, txnMgr)); - return result; - } catch (Exception e) { - throw new HiveException(e); + public List getPreprocessedMaterializedViews( + List tablesUsed, HiveTxnManager txnMgr) + throws HiveException { + // From metastore + List
materializedViewTables = + getAllMaterializedViewObjectsForRewriting(); + if (materializedViewTables.isEmpty()) { + // Bail out: empty list + return new ArrayList<>(); } + // Return final result + return getValidMaterializedViews(materializedViewTables, tablesUsed, false, txnMgr); } - public List getValidMaterializedView(String dbName, String materializedViewName, - List tablesUsed, boolean forceMVContentsUpToDate, HiveTxnManager txnMgr) throws HiveException { - return getValidMaterializedViews(ImmutableList.of(getTable(dbName, materializedViewName)), - tablesUsed, forceMVContentsUpToDate, txnMgr); + /** + * Get the target materialized view from the metastore. Although it may load the plan + * from the registry, it is guaranteed that it will always return an up-to-date version + * wrt metastore. + * + * @return the materialized view for rebuild + * @throws HiveException + */ + public RelOptMaterialization getMaterializedViewForRebuild(String dbName, String materializedViewName, + List tablesUsed, HiveTxnManager txnMgr) throws HiveException { + List validMaterializedViews = getValidMaterializedViews( + ImmutableList.of(getTable(dbName, materializedViewName)), tablesUsed, true, txnMgr); + if (validMaterializedViews.isEmpty()) { + return null; + } + assert validMaterializedViews.size() == 1; + return validMaterializedViews.get(0); } private List getValidMaterializedViews(List
materializedViewTables, - List tablesUsed, boolean forceMVContentsUpToDate, HiveTxnManager txnMgr) throws HiveException { + List tablesUsed, boolean forceMVContentsUpToDate, HiveTxnManager txnMgr) + throws HiveException { final String validTxnsList = conf.get(ValidTxnList.VALID_TXNS_KEY); final ValidTxnWriteIdList currentTxnWriteIds = txnMgr.getValidWriteIds(tablesUsed, validTxnsList); final boolean tryIncrementalRewriting = @@ -1672,7 +1825,7 @@ public Table apply(org.apache.hadoop.hive.metastore.api.Table table) { final CreationMetadata creationMetadata = materializedViewTable.getCreationMetadata(); if (outdated) { // The MV is outdated, see whether we should consider it for rewriting or not - boolean ignore = false; + boolean ignore; if (forceMVContentsUpToDate && !tryIncrementalRebuild) { // We will not try partial rewriting for rebuild if incremental rebuild is disabled ignore = true; @@ -1706,8 +1859,7 @@ public Table apply(org.apache.hadoop.hive.metastore.api.Table table) { } else { cachedMaterializedViewTable = (RelOptHiveTable) viewScan.getTable(); } - if (cachedMaterializedViewTable.getHiveTableMD().getCreateTime() == - materializedViewTable.getCreateTime()) { + if (cachedMaterializedViewTable.getHiveTableMD().equals(materializedViewTable)) { // It is in the cache and up to date if (outdated) { // We will rewrite it to include the filters on transaction list @@ -1722,31 +1874,23 @@ public Table apply(org.apache.hadoop.hive.metastore.api.Table table) { } // It was not present in the cache (maybe because it was added by another HS2) - // or it is not up to date. - if (HiveMaterializedViewsRegistry.get().isInitialized()) { - // But the registry was fully initialized, thus we need to add it - if (LOG.isDebugEnabled()) { - LOG.debug("Materialized view " + materializedViewTable.getFullyQualifiedName() + - " was not in the cache"); - } - materialization = HiveMaterializedViewsRegistry.get().createMaterializedView( - conf, materializedViewTable); - if (materialization != null) { - if (outdated) { - // We will rewrite it to include the filters on transaction list - // so we can produce partial rewritings - materialization = augmentMaterializationWithTimeInformation( - materialization, validTxnsList, new ValidTxnWriteIdList( - creationMetadata.getValidTxnList())); - } - result.add(materialization); - } - } else { - // Otherwise the registry has not been initialized, skip for the time being - if (LOG.isWarnEnabled()) { - LOG.info("Materialized view " + materializedViewTable.getFullyQualifiedName() + " was skipped " - + "because cache has not been loaded yet"); + // or it is not up to date. We need to add it + if (LOG.isDebugEnabled()) { + LOG.debug("Materialized view " + materializedViewTable.getFullyQualifiedName() + + " was not in the cache"); + } + materialization = HiveMaterializedViewsRegistry.get().createMaterialization( + conf, materializedViewTable); + if (materialization != null) { + HiveMaterializedViewsRegistry.get().refreshMaterializedView(conf, null, materializedViewTable); + if (outdated) { + // We will rewrite it to include the filters on transaction list + // so we can produce partial rewritings + materialization = augmentMaterializationWithTimeInformation( + materialization, validTxnsList, new ValidTxnWriteIdList( + creationMetadata.getValidTxnList())); } + result.add(materialization); } } return result; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java index c3392586f1..4592f5ec34 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java @@ -25,12 +25,14 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.ImmutableMap; +import java.util.function.BiFunction; import org.apache.calcite.adapter.druid.DruidQuery; import org.apache.calcite.adapter.druid.DruidSchema; import org.apache.calcite.adapter.druid.DruidTable; @@ -40,6 +42,7 @@ import org.apache.calcite.plan.RelOptMaterialization; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -49,10 +52,12 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveTypeSystemImpl; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; @@ -84,6 +89,7 @@ public final class HiveMaterializedViewsRegistry { private static final Logger LOG = LoggerFactory.getLogger(HiveMaterializedViewsRegistry.class); + private static final String CLASS_NAME = HiveMaterializedViewsRegistry.class.getName(); /* Singleton */ private static final HiveMaterializedViewsRegistry SINGLETON = new HiveMaterializedViewsRegistry(); @@ -138,11 +144,13 @@ public void init(Hive db) { LOG.info("Using dummy materialized views registry"); } else { // We initialize the cache - ExecutorService pool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("HiveMaterializedViewsRegistry-%d") - .build()); - pool.submit(new Loader(db)); - pool.shutdown(); + long period = HiveConf.getTimeVar(db.getConf(), ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_REFRESH, TimeUnit.SECONDS); + ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("HiveMaterializedViewsRegistry-%d") + .build()); + pool.scheduleAtFixedRate(new Loader(db), 0, period, TimeUnit.SECONDS); } } @@ -155,20 +163,44 @@ private Loader(Hive db) { @Override public void run() { + SessionState ss = new SessionState(db.getConf()); + ss.setIsHiveServerQuery(true); // All is served from HS2, we do not need e.g. Tez sessions + SessionState.start(ss); + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.MATERIALIZED_VIEWS_REGISTRY_REFRESH); try { - SessionState ss = new SessionState(db.getConf()); - ss.setIsHiveServerQuery(true); // All is served from HS2, we do not need e.g. Tez sessions - SessionState.start(ss); - final boolean cache = !db.getConf() - .get(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname).equals("DUMMY"); - for (Table mv : db.getAllMaterializedViewObjectsForRewriting()) { - addMaterializedView(db.getConf(), mv, OpType.LOAD, cache); + if (initialized.get()) { + for (Table mvTable : db.getAllMaterializedViewObjectsForRewriting()) { + RelOptMaterialization existingMV = getRewritingMaterializedView(mvTable.getDbName(), mvTable.getTableName()); + if (existingMV != null) { + // We replace if the existing MV is not newer + Table existingMVTable = extractTable(existingMV); + if (existingMVTable.getCreateTime() < mvTable.getCreateTime() || + (existingMVTable.getCreateTime() == mvTable.getCreateTime() && + existingMVTable.getCreationMetadata().getMaterializationTime() <= mvTable.getCreationMetadata().getMaterializationTime())) { + refreshMaterializedView(db.getConf(), existingMVTable, mvTable); + } + } else { + // Simply replace if it still does not exist + refreshMaterializedView(db.getConf(), null, mvTable); + } + } + LOG.info("Materialized views registry has been refreshed"); + } else { + for (Table mvTable : db.getAllMaterializedViewObjectsForRewriting()) { + refreshMaterializedView(db.getConf(), null, mvTable); + } + initialized.set(true); + LOG.info("Materialized views registry has been initialized"); } - initialized.set(true); - LOG.info("Materialized views registry has been initialized"); } catch (HiveException e) { - LOG.error("Problem connecting to the metastore when initializing the view registry", e); + if (initialized.get()) { + LOG.error("Problem connecting to the metastore when refreshing the view registry", e); + } else { + LOG.error("Problem connecting to the metastore when initializing the view registry", e); + } } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.MATERIALIZED_VIEWS_REGISTRY_REFRESH); } } @@ -177,43 +209,9 @@ public boolean isInitialized() { } /** - * Adds a newly created materialized view to the cache. - * - * @param materializedViewTable the materialized view + * Parses and creates a materialization. */ - public RelOptMaterialization createMaterializedView(HiveConf conf, Table materializedViewTable) { - final boolean cache = !conf.get(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname) - .equals("DUMMY"); - return addMaterializedView(conf, materializedViewTable, OpType.CREATE, cache); - } - - /** - * Adds the materialized view to the cache. - * - * @param materializedViewTable the materialized view - */ - private RelOptMaterialization addMaterializedView(HiveConf conf, Table materializedViewTable, - OpType opType, boolean cache) { - // Bail out if it is not enabled for rewriting - if (!materializedViewTable.isRewriteEnabled()) { - LOG.debug("Materialized view " + materializedViewTable.getCompleteName() + - " ignored; it is not rewrite enabled"); - return null; - } - - // We are going to create the map for each view in the given database - ConcurrentMap cq = - new ConcurrentHashMap(); - if (cache) { - // If we are caching the MV, we include it in the cache - final ConcurrentMap prevCq = materializedViews.putIfAbsent( - materializedViewTable.getDbName(), cq); - if (prevCq != null) { - cq = prevCq; - } - } - - // Start the process to add MV to the cache + public RelOptMaterialization createMaterialization(HiveConf conf, Table materializedViewTable) { // First we parse the view query and create the materialization object final String viewQuery = materializedViewTable.getViewExpandedText(); final RelNode viewScan = createMaterializedViewScan(conf, materializedViewTable); @@ -231,36 +229,126 @@ private RelOptMaterialization addMaterializedView(HiveConf conf, Table materiali return null; } - RelOptMaterialization materialization = new RelOptMaterialization(viewScan, queryRel, + return new RelOptMaterialization(viewScan, queryRel, null, viewScan.getTable().getQualifiedName()); - if (opType == OpType.CREATE) { - // You store the materialized view - cq.put(materializedViewTable.getTableName(), materialization); - } else { - // For LOAD, you only add it if it does exist as you might be loading an outdated MV - cq.putIfAbsent(materializedViewTable.getTableName(), materialization); + } + + /** + * Adds a newly created materialized view to the cache. + */ + public void createMaterializedView(HiveConf conf, Table materializedViewTable) { + final boolean cache = !conf.get(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname) + .equals("DUMMY"); + if (!cache) { + // Nothing to do, bail out + return; + } + + // Bail out if it is not enabled for rewriting + if (!materializedViewTable.isRewriteEnabled()) { + LOG.debug("Materialized view " + materializedViewTable.getCompleteName() + + " ignored; it is not rewrite enabled"); + return; + } + + // We are going to create the map for each view in the given database + ConcurrentMap dbMap = + new ConcurrentHashMap(); + // If we are caching the MV, we include it in the cache + final ConcurrentMap prevDbMap = materializedViews.putIfAbsent( + materializedViewTable.getDbName(), dbMap); + if (prevDbMap != null) { + dbMap = prevDbMap; } + RelOptMaterialization materialization = createMaterialization(conf, materializedViewTable); + if (materialization == null) { + return; + } + // You store the materialized view + dbMap.put(materializedViewTable.getTableName(), materialization); + if (LOG.isDebugEnabled()) { - LOG.debug("Created materialized view for rewriting: " + viewScan.getTable().getQualifiedName()); + LOG.debug("Created materialized view for rewriting: " + materializedViewTable.getFullyQualifiedName()); } - return materialization; } /** - * Removes the materialized view from the cache. - * - * @param materializedViewTable the materialized view to remove + * Update the materialized view in the registry (if existing materialized view matches). + */ + public void refreshMaterializedView(HiveConf conf, Table oldMaterializedViewTable, Table materializedViewTable) { + final boolean cache = !conf.get(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname) + .equals("DUMMY"); + if (!cache) { + // Nothing to do, bail out + return; + } + + // Bail out if it is not enabled for rewriting + if (!materializedViewTable.isRewriteEnabled()) { + dropMaterializedView(oldMaterializedViewTable); + LOG.debug("Materialized view " + materializedViewTable.getCompleteName() + + " dropped; it is not rewrite enabled"); + return; + } + + // We are going to create the map for each view in the given database + ConcurrentMap dbMap = + new ConcurrentHashMap(); + // If we are caching the MV, we include it in the cache + final ConcurrentMap prevDbMap = materializedViews.putIfAbsent( + materializedViewTable.getDbName(), dbMap); + if (prevDbMap != null) { + dbMap = prevDbMap; + } + final RelOptMaterialization newMaterialization = createMaterialization(conf, materializedViewTable); + if (newMaterialization == null) { + return; + } + dbMap.compute(materializedViewTable.getTableName(), new BiFunction() { + @Override + public RelOptMaterialization apply(String tableName, RelOptMaterialization existingMaterialization) { + if (existingMaterialization == null) { + // If it was not existing, we just create it + return newMaterialization; + } + Table existingMaterializedViewTable = extractTable(existingMaterialization); + if (existingMaterializedViewTable.equals(oldMaterializedViewTable)) { + // If the old version is the same, we replace it + return newMaterialization; + } + // Otherwise, we return existing materialization + return existingMaterialization; + } + }); + + if (LOG.isDebugEnabled()) { + LOG.debug("Materialized view refreshed: " + materializedViewTable.getFullyQualifiedName()); + } + } + + /** + * Removes the materialized view from the cache (based on table object equality), if exists. */ public void dropMaterializedView(Table materializedViewTable) { - dropMaterializedView(materializedViewTable.getDbName(), materializedViewTable.getTableName()); + ConcurrentMap dbMap = materializedViews.get(materializedViewTable.getDbName()); + if (dbMap != null) { + // Delete only if the create time for the input materialized view table and the table + // in the map match. Otherwise, keep the one in the map. + dbMap.computeIfPresent(materializedViewTable.getTableName(), new BiFunction() { + @Override + public RelOptMaterialization apply(String tableName, RelOptMaterialization oldMaterialization) { + if (extractTable(oldMaterialization).equals(materializedViewTable)) { + return null; + } + return oldMaterialization; + } + }); + } } /** - * Removes the materialized view from the cache. - * - * @param dbName the db for the materialized view to remove - * @param tableName the name for the materialized view to remove + * Removes the materialized view from the cache (based on qualified name), if exists. */ public void dropMaterializedView(String dbName, String tableName) { ConcurrentMap dbMap = materializedViews.get(dbName); @@ -269,10 +357,20 @@ public void dropMaterializedView(String dbName, String tableName) { } } + /** + * Returns all the materialized views in the cache. + * + * @return the collection of materialized views, or the empty collection if none + */ + List getRewritingMaterializedViews() { + List result = new ArrayList<>(); + materializedViews.forEach((dbName, mvs) -> result.addAll(mvs.values())); + return result; + } + /** * Returns the materialized views in the cache for the given database. * - * @param dbName the database * @return the collection of materialized views, or the empty collection if none */ RelOptMaterialization getRewritingMaterializedView(String dbName, String viewName) { @@ -422,6 +520,17 @@ private static TableType obtainTableType(Table tabMetaData) { return TableType.NATIVE; } + private static Table extractTable(RelOptMaterialization materialization) { + RelOptHiveTable cachedMaterializedViewTable; + if (materialization.tableRel instanceof Project) { + // There is a Project on top (due to nullability) + cachedMaterializedViewTable = (RelOptHiveTable) materialization.tableRel.getInput(0).getTable(); + } else { + cachedMaterializedViewTable = (RelOptHiveTable) materialization.tableRel.getTable(); + } + return cachedMaterializedViewTable.getHiveTableMD(); + } + //@TODO this seems to be the same as org.apache.hadoop.hive.ql.parse.CalcitePlanner.TableType.DRUID do we really need both private enum TableType { DRUID, @@ -429,9 +538,4 @@ private static TableType obtainTableType(Table tabMetaData) { JDBC } - private enum OpType { - CREATE, //view just being created - LOAD // already created view being loaded - } - -} \ No newline at end of file +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 60cd71583f..ef2ebac82c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -2247,7 +2247,10 @@ private RelNode applyMaterializedViewRewriting(RelOptPlanner planner, RelNode ba final RelOptCluster optCluster = basePlan.getCluster(); final PerfLogger perfLogger = SessionState.getPerfLogger(); + final boolean useMaterializedViewsRegistry = !conf.get(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname) + .equals("DUMMY"); final RelNode calcitePreMVRewritingPlan = basePlan; + final List tablesUsedQuery = getTablesUsed(basePlan); final boolean mvRebuild = mvRebuildMode != MaterializationRebuildMode.NONE; // Add views to planner @@ -2257,13 +2260,21 @@ private RelNode applyMaterializedViewRewriting(RelOptPlanner planner, RelNode ba // We only retrieve the materialization corresponding to the rebuild. In turn, // we pass 'true' for the forceMVContentsUpToDate parameter, as we cannot allow the // materialization contents to be stale for a rebuild if we want to use it. - materializations = db.getValidMaterializedView(mvRebuildDbName, mvRebuildName, - getTablesUsed(basePlan), true, getTxnMgr()); + RelOptMaterialization materialization = db.getMaterializedViewForRebuild( + mvRebuildDbName, mvRebuildName, tablesUsedQuery, getTxnMgr()); + if (materialization != null) { + materializations.add(materialization); + } } else { - // This is not a rebuild, we retrieve all the materializations. In turn, we do not need - // to force the materialization contents to be up-to-date, as this is not a rebuild, and - // we apply the user parameters (HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW) instead. - materializations = db.getAllValidMaterializedViews(getTablesUsed(basePlan), false, getTxnMgr()); + // This is not a rebuild, we retrieve all the materializations. + // In turn, we do not need to force the materialization contents to be up-to-date, + // as this is not a rebuild, and we apply the user parameters + // (HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW) instead. + if (useMaterializedViewsRegistry) { + materializations = db.getPreprocessedMaterializedViewsFromRegistry(tablesUsedQuery, getTxnMgr()); + } else { + materializations = db.getPreprocessedMaterializedViews(tablesUsedQuery, getTxnMgr()); + } } // We need to use the current cluster for the scan operator on views, // otherwise the planner will throw an Exception (different planners) @@ -2310,80 +2321,101 @@ private RelNode copyNodeScan(RelNode scan) { } catch (HiveException e) { LOG.warn("Exception loading materialized views", e); } - if (!materializations.isEmpty()) { - perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); - if (mvRebuild) { - // If it is a materialized view rebuild, we use the HepPlanner, since we only have - // one MV and we would like to use it to create incremental maintenance plans - HepPlanner hepPlanner = createHepPlanner(basePlan.getCluster(), true, mdProvider, null, - HepMatchOrder.TOP_DOWN, HiveMaterializedViewRule.MATERIALIZED_VIEW_REWRITING_RULES); - // Add materialization for rebuild to planner - assert materializations.size() == 1; - hepPlanner.addMaterialization(materializations.get(0)); - // Optimize plan - hepPlanner.setRoot(basePlan); - basePlan = hepPlanner.findBestExp(); - } else { - // If this is not a rebuild, we use Volcano planner as the decision - // on whether to use MVs or not and which MVs to use should be cost-based - optCluster.invalidateMetadataQuery(); - RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.DEFAULT); - - // Add materializations to planner - for (RelOptMaterialization materialization : materializations) { - planner.addMaterialization(materialization); - } - // Add rule to split aggregate with grouping sets (if any) - planner.addRule(HiveAggregateSplitRule.INSTANCE); - // Add view-based rewriting rules to planner - for (RelOptRule rule : HiveMaterializedViewRule.MATERIALIZED_VIEW_REWRITING_RULES) { - planner.addRule(rule); - } - // Partition pruner rule - planner.addRule(HiveFilterProjectTSTransposeRule.INSTANCE); - planner.addRule(new HivePartitionPruneRule(conf)); - - // Optimize plan - planner.setRoot(basePlan); - basePlan = planner.findBestExp(); - // Remove view-based rewriting rules from planner - planner.clear(); - - // Restore default cost model - optCluster.invalidateMetadataQuery(); - RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(mdProvider)); + if (materializations.isEmpty()) { + // There are no materializations, we can return the original plan + return calcitePreMVRewritingPlan; + } + + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + + if (mvRebuild) { + // If it is a materialized view rebuild, we use the HepPlanner, since we only have + // one MV and we would like to use it to create incremental maintenance plans + HepPlanner hepPlanner = createHepPlanner(basePlan.getCluster(), true, mdProvider, null, + HepMatchOrder.TOP_DOWN, HiveMaterializedViewRule.MATERIALIZED_VIEW_REWRITING_RULES); + // Add materialization for rebuild to planner + assert materializations.size() == 1; + hepPlanner.addMaterialization(materializations.get(0)); + // Optimize plan + hepPlanner.setRoot(basePlan); + basePlan = hepPlanner.findBestExp(); + } else { + // If this is not a rebuild, we use Volcano planner as the decision + // on whether to use MVs or not and which MVs to use should be cost-based + optCluster.invalidateMetadataQuery(); + RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.DEFAULT); + + // Add materializations to planner + for (RelOptMaterialization materialization : materializations) { + planner.addMaterialization(materialization); } + // Add rule to split aggregate with grouping sets (if any) + planner.addRule(HiveAggregateSplitRule.INSTANCE); + // Add view-based rewriting rules to planner + for (RelOptRule rule : HiveMaterializedViewRule.MATERIALIZED_VIEW_REWRITING_RULES) { + planner.addRule(rule); + } + // Partition pruner rule + planner.addRule(HiveFilterProjectTSTransposeRule.INSTANCE); + planner.addRule(new HivePartitionPruneRule(conf)); - perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: View-based rewriting"); - - if (!RelOptUtil.toString(calcitePreMVRewritingPlan).equals(RelOptUtil.toString(basePlan))) { - // A rewriting was produced, we will check whether it was part of an incremental rebuild - // to try to replace INSERT OVERWRITE by INSERT or MERGE - if (mvRebuildMode == MaterializationRebuildMode.INSERT_OVERWRITE_REBUILD && - HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REBUILD_INCREMENTAL)) { - // First we need to check if it is valid to convert to MERGE/INSERT INTO. - // If we succeed, we modify the plan and afterwards the AST. - // MV should be an acid table. - MaterializedViewRewritingRelVisitor visitor = new MaterializedViewRewritingRelVisitor(); - visitor.go(basePlan); - if (visitor.isRewritingAllowed()) { - // Trigger rewriting to remove UNION branch with MV - if (visitor.isContainsAggregate()) { - basePlan = hepPlan(basePlan, false, mdProvider, null, - HepMatchOrder.TOP_DOWN, HiveAggregateIncrementalRewritingRule.INSTANCE); - mvRebuildMode = MaterializationRebuildMode.AGGREGATE_REBUILD; - } else { - basePlan = hepPlan(basePlan, false, mdProvider, null, - HepMatchOrder.TOP_DOWN, HiveNoAggregateIncrementalRewritingRule.INSTANCE); - mvRebuildMode = MaterializationRebuildMode.NO_AGGREGATE_REBUILD; - } + // Optimize plan + planner.setRoot(basePlan); + basePlan = planner.findBestExp(); + // Remove view-based rewriting rules from planner + planner.clear(); + + // Restore default cost model + optCluster.invalidateMetadataQuery(); + RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(mdProvider)); + } + + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: View-based rewriting"); + + List
materializedViewsUsedOriginalPlan = getMaterializedViewsUsed(calcitePreMVRewritingPlan); + List
materializedViewsUsedAfterRewrite = getMaterializedViewsUsed(basePlan); + if (materializedViewsUsedOriginalPlan.size() == materializedViewsUsedAfterRewrite.size()) { + // Materialized view-based rewriting did not happen, we can return the original plan + return calcitePreMVRewritingPlan; + } + + // A rewriting was produced, we will check whether it was part of an incremental rebuild + // to try to replace INSERT OVERWRITE by INSERT or MERGE + if (mvRebuildMode == MaterializationRebuildMode.INSERT_OVERWRITE_REBUILD) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REBUILD_INCREMENTAL)) { + // First we need to check if it is valid to convert to MERGE/INSERT INTO. + // If we succeed, we modify the plan and afterwards the AST. + // MV should be an acid table. + MaterializedViewRewritingRelVisitor visitor = new MaterializedViewRewritingRelVisitor(); + visitor.go(basePlan); + if (visitor.isRewritingAllowed()) { + // Trigger rewriting to remove UNION branch with MV + if (visitor.isContainsAggregate()) { + basePlan = hepPlan(basePlan, false, mdProvider, null, + HepMatchOrder.TOP_DOWN, HiveAggregateIncrementalRewritingRule.INSTANCE); + mvRebuildMode = MaterializationRebuildMode.AGGREGATE_REBUILD; + } else { + basePlan = hepPlan(basePlan, false, mdProvider, null, + HepMatchOrder.TOP_DOWN, HiveNoAggregateIncrementalRewritingRule.INSTANCE); + mvRebuildMode = MaterializationRebuildMode.NO_AGGREGATE_REBUILD; } } - // Now we trigger some needed optimization rules again - basePlan = applyPreJoinOrderingTransforms(basePlan, mdProvider, executorProvider); + } + } else if (useMaterializedViewsRegistry) { + // Before proceeding we need to check whether materialized views used are up-to-date + // wrt information in metastore + try { + if (!db.validateMaterializedViewsFromRegistry(materializedViewsUsedAfterRewrite, tablesUsedQuery, getTxnMgr())) { + return calcitePreMVRewritingPlan; + } + } catch (HiveException e) { + LOG.warn("Exception validating materialized views", e); + return calcitePreMVRewritingPlan; } } + // Now we trigger some needed optimization rules again + basePlan = applyPreJoinOrderingTransforms(basePlan, mdProvider, executorProvider); if (mvRebuildMode == MaterializationRebuildMode.AGGREGATE_REBUILD) { // Make a cost-based decision factoring the configuration property @@ -2392,7 +2424,7 @@ private RelNode copyNodeScan(RelNode scan) { RelMetadataQuery mq = RelMetadataQuery.instance(); RelOptCost costOriginalPlan = mq.getCumulativeCost(calcitePreMVRewritingPlan); final double factorSelectivity = (double) HiveConf.getFloatVar( - conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REBUILD_INCREMENTAL_FACTOR); + conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REBUILD_INCREMENTAL_FACTOR); RelOptCost costRebuildPlan = mq.getCumulativeCost(basePlan).multiplyBy(factorSelectivity); if (costOriginalPlan.isLe(costRebuildPlan)) { basePlan = calcitePreMVRewritingPlan; @@ -2420,6 +2452,30 @@ public void visit(RelNode node, int ordinal, RelNode parent) { return tablesUsed; } + private List
getMaterializedViewsUsed(RelNode plan) { + List
materializedViewsUsed = new ArrayList<>(); + new RelVisitor() { + @Override + public void visit(RelNode node, int ordinal, RelNode parent) { + if (node instanceof TableScan) { + TableScan ts = (TableScan) node; + Table table = ((RelOptHiveTable) ts.getTable()).getHiveTableMD(); + if (table.isMaterializedView()) { + materializedViewsUsed.add(table); + } + } else if (node instanceof DruidQuery) { + DruidQuery dq = (DruidQuery) node; + Table table = ((RelOptHiveTable) dq.getTable()).getHiveTableMD(); + if (table.isMaterializedView()) { + materializedViewsUsed.add(table); + } + } + super.visit(node, ordinal, parent); + } + }.go(plan); + return materializedViewsUsed; + } + /** * Run the HEP Planner with the given rule set. * diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 1865d77000..7b0e5e0c5e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -115,6 +115,7 @@ import org.apache.hadoop.hive.ql.ddl.view.AlterMaterializedViewRewriteDesc; import org.apache.hadoop.hive.ql.ddl.view.DropMaterializedViewDesc; import org.apache.hadoop.hive.ql.ddl.view.DropViewDesc; +import org.apache.hadoop.hive.ql.ddl.view.MaterializedViewUpdateDesc; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ColumnStatsUpdateTask; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; @@ -3274,7 +3275,22 @@ private void analyzeAlterMaterializedViewRewrite(String fqMvName, ASTNode ast) t inputs.add(new ReadEntity(materializedViewTable)); outputs.add(new WriteEntity(materializedViewTable, WriteEntity.WriteType.DDL_EXCLUSIVE)); - rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterMVRewriteDesc))); + + // Create task for alterMVRewriteDesc + DDLWork work = new DDLWork(getInputs(), getOutputs(), alterMVRewriteDesc); + Task targetTask = TaskFactory.get(work); + + // Create task to update rewrite flag as dependant of previous one + String tableName = alterMVRewriteDesc.getMaterializedViewName(); + boolean retrieveAndInclude = alterMVRewriteDesc.isRewriteEnable(); + boolean disableRewrite = !alterMVRewriteDesc.isRewriteEnable(); + MaterializedViewUpdateDesc materializedViewUpdateDesc = + new MaterializedViewUpdateDesc(tableName, retrieveAndInclude, disableRewrite, false); + DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), materializedViewUpdateDesc); + targetTask.addDependentTask(TaskFactory.get(ddlWork, conf)); + + // Add root task + rootTasks.add(targetTask); } } diff --git a/ql/src/test/queries/clientpositive/materialized_view_partitioned_2.q b/ql/src/test/queries/clientpositive/materialized_view_partitioned_2.q index a3f437ed93..f617eaff57 100644 --- a/ql/src/test/queries/clientpositive/materialized_view_partitioned_2.q +++ b/ql/src/test/queries/clientpositive/materialized_view_partitioned_2.q @@ -32,8 +32,8 @@ SELECT * FROM src_txn_2 where value > 'val_220' and value < 'val_230'; SELECT * FROM src_txn_2 where value > 'val_220' and value < 'val_230'; --- SHOULD CHOOSE partition_mv_4 SINCE IT IS THE MOST EFFICIENT --- READING ONLY ONE PARTITION +-- SHOULD CHOOSE partition_mv_1, partition_mv_3 OR partition_mv_4 +-- SINCE IT IS THE MOST EFFICIENT READING ONLY ONE PARTITION EXPLAIN SELECT * FROM src_txn_2 where key > 224 and key < 226; diff --git a/ql/src/test/results/clientnegative/strict_pruning_2.q.out b/ql/src/test/results/clientnegative/strict_pruning_2.q.out index b50d411cb8..85c16b9837 100644 --- a/ql/src/test/results/clientnegative/strict_pruning_2.q.out +++ b/ql/src/test/results/clientnegative/strict_pruning_2.q.out @@ -1 +1 @@ -FAILED: SemanticException [Error 10056]: Queries against partitioned tables without a partition filter are disabled for safety reasons. If you know what you are doing, please set hive.strict.checks.no.partition.filter to false and make sure that hive.mapred.mode is not set to 'strict' to proceed. Note that you may get errors or incorrect results if you make a mistake while using some of the unsafe features. No partition predicate for Alias "srcpart" Table "srcpart" +FAILED: SemanticException [Error 10056]: Queries against partitioned tables without a partition filter are disabled for safety reasons. If you know what you are doing, please set hive.strict.checks.no.partition.filter to false and make sure that hive.mapred.mode is not set to 'strict' to proceed. Note that you may get errors or incorrect results if you make a mistake while using some of the unsafe features. No partition predicate for Alias "default.srcpart" Table "srcpart" diff --git a/ql/src/test/results/clientpositive/beeline/materialized_view_create_rewrite.q.out b/ql/src/test/results/clientpositive/beeline/materialized_view_create_rewrite.q.out index 3fd6b20473..6c77549ae7 100644 --- a/ql/src/test/results/clientpositive/beeline/materialized_view_create_rewrite.q.out +++ b/ql/src/test/results/clientpositive/beeline/materialized_view_create_rewrite.q.out @@ -260,6 +260,7 @@ POSTHOOK: Input: default@cmv_mat_view2_n4 POSTHOOK: Output: default@cmv_mat_view2_n4 STAGE DEPENDENCIES: Stage-0 is a root stage + Stage-1 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-0 @@ -267,6 +268,11 @@ STAGE PLANS: name: default.cmv_mat_view2_n4 enable: true + Stage: Stage-1 + Materialized View Update + name: default.cmv_mat_view2_n4 + retrieveAndInclude: true + PREHOOK: query: alter materialized view cmv_mat_view2_n4 enable rewrite PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE PREHOOK: Input: default@cmv_mat_view2_n4 diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite.q.out index 53f723387e..93a1103465 100644 --- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite.q.out +++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite.q.out @@ -272,6 +272,7 @@ POSTHOOK: Input: default@cmv_mat_view2_n4 POSTHOOK: Output: default@cmv_mat_view2_n4 STAGE DEPENDENCIES: Stage-0 is a root stage + Stage-1 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-0 @@ -279,6 +280,11 @@ STAGE PLANS: name: default.cmv_mat_view2_n4 enable: true + Stage: Stage-1 + Materialized View Update + name: default.cmv_mat_view2_n4 + retrieveAndInclude: true + PREHOOK: query: alter materialized view cmv_mat_view2_n4 enable rewrite PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE PREHOOK: Input: default@cmv_mat_view2_n4 diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out index 81d69b18ed..cf93cff14b 100644 --- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out +++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out @@ -467,6 +467,7 @@ POSTHOOK: Input: default@cmv_mat_view_n5 POSTHOOK: Output: default@cmv_mat_view_n5 STAGE DEPENDENCIES: Stage-0 is a root stage + Stage-1 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-0 @@ -474,6 +475,11 @@ STAGE PLANS: name: default.cmv_mat_view_n5 enable: true + Stage: Stage-1 + Materialized View Update + name: default.cmv_mat_view_n5 + retrieveAndInclude: true + PREHOOK: query: ALTER MATERIALIZED VIEW cmv_mat_view_n5 ENABLE REWRITE PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE PREHOOK: Input: default@cmv_mat_view_n5 diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_dummy.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_dummy.q.out index 25c89a546c..62b74da7b2 100644 --- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_dummy.q.out +++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_dummy.q.out @@ -147,6 +147,7 @@ POSTHOOK: Input: default@cmv_mat_view2 POSTHOOK: Output: default@cmv_mat_view2 STAGE DEPENDENCIES: Stage-0 is a root stage + Stage-1 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-0 @@ -154,6 +155,11 @@ STAGE PLANS: name: default.cmv_mat_view2 disable: true + Stage: Stage-1 + Materialized View Update + name: default.cmv_mat_view2 + disableRewrite: true + PREHOOK: query: alter materialized view cmv_mat_view2 disable rewrite PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE PREHOOK: Input: default@cmv_mat_view2 @@ -291,6 +297,7 @@ POSTHOOK: Input: default@cmv_mat_view2 POSTHOOK: Output: default@cmv_mat_view2 STAGE DEPENDENCIES: Stage-0 is a root stage + Stage-1 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-0 @@ -298,6 +305,11 @@ STAGE PLANS: name: default.cmv_mat_view2 enable: true + Stage: Stage-1 + Materialized View Update + name: default.cmv_mat_view2 + retrieveAndInclude: true + PREHOOK: query: alter materialized view cmv_mat_view2 enable rewrite PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE PREHOOK: Input: default@cmv_mat_view2 diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_time_window.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_time_window.q.out index 7da22c0616..5f7c4f6b3f 100644 --- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_time_window.q.out +++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_time_window.q.out @@ -467,6 +467,7 @@ POSTHOOK: Input: default@cmv_mat_view_n3 POSTHOOK: Output: default@cmv_mat_view_n3 STAGE DEPENDENCIES: Stage-0 is a root stage + Stage-1 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-0 @@ -474,6 +475,11 @@ STAGE PLANS: name: default.cmv_mat_view_n3 enable: true + Stage: Stage-1 + Materialized View Update + name: default.cmv_mat_view_n3 + retrieveAndInclude: true + PREHOOK: query: ALTER MATERIALIZED VIEW cmv_mat_view_n3 ENABLE REWRITE PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE PREHOOK: Input: default@cmv_mat_view_n3 diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_partitioned_2.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_partitioned_2.q.out index d9c76fbf06..6726c1e9d0 100644 --- a/ql/src/test/results/clientpositive/llap/materialized_view_partitioned_2.q.out +++ b/ql/src/test/results/clientpositive/llap/materialized_view_partitioned_2.q.out @@ -525,13 +525,13 @@ POSTHOOK: Input: default@src_txn_2 PREHOOK: query: EXPLAIN SELECT * FROM src_txn_2 where key > 224 and key < 226 PREHOOK: type: QUERY -PREHOOK: Input: default@partition_mv_4 +PREHOOK: Input: default@partition_mv_1 PREHOOK: Input: default@src_txn_2 #### A masked pattern was here #### POSTHOOK: query: EXPLAIN SELECT * FROM src_txn_2 where key > 224 and key < 226 POSTHOOK: type: QUERY -POSTHOOK: Input: default@partition_mv_4 +POSTHOOK: Input: default@partition_mv_1 POSTHOOK: Input: default@src_txn_2 #### A masked pattern was here #### STAGE DEPENDENCIES: @@ -543,7 +543,7 @@ STAGE PLANS: limit: -1 Processor Tree: TableScan - alias: default.partition_mv_4 + alias: default.partition_mv_1 filterExpr: ((UDFToDouble(key) > 224.0D) and (UDFToDouble(key) < 226.0D)) (type: boolean) Filter Operator predicate: ((UDFToDouble(key) > 224.0D) and (UDFToDouble(key) < 226.0D)) (type: boolean) @@ -554,14 +554,14 @@ STAGE PLANS: PREHOOK: query: SELECT * FROM src_txn_2 where key > 223 and key < 225 PREHOOK: type: QUERY -PREHOOK: Input: default@partition_mv_4 -PREHOOK: Input: default@partition_mv_4@key=224 +PREHOOK: Input: default@partition_mv_1 +PREHOOK: Input: default@partition_mv_1@key=224 PREHOOK: Input: default@src_txn_2 #### A masked pattern was here #### POSTHOOK: query: SELECT * FROM src_txn_2 where key > 223 and key < 225 POSTHOOK: type: QUERY -POSTHOOK: Input: default@partition_mv_4 -POSTHOOK: Input: default@partition_mv_4@key=224 +POSTHOOK: Input: default@partition_mv_1 +POSTHOOK: Input: default@partition_mv_1@key=224 POSTHOOK: Input: default@src_txn_2 #### A masked pattern was here #### 224 val_224 diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 6029d11e67..504e6b12a1 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -38,6 +38,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.SortedSet; @@ -2002,14 +2003,6 @@ public Materialization getMaterializationInvalidationInfo( return null; } - // Parse validTxnList - final ValidReadTxnList validTxnList = - new ValidReadTxnList(validTxnListStr); - - // Parse validReaderWriteIdList from creation metadata - final ValidTxnWriteIdList validReaderWriteIdList = - new ValidTxnWriteIdList(creationMetadata.getValidTxnList()); - // We are composing a query that returns a single row if an update happened after // the materialization was created. Otherwise, query returns 0 rows. Connection dbConn = null; @@ -2017,12 +2010,48 @@ public Materialization getMaterializationInvalidationInfo( ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + + // Parse validReaderWriteIdList from creation metadata + final ValidTxnWriteIdList validReaderWriteIdList = + new ValidTxnWriteIdList(creationMetadata.getValidTxnList()); + + // Parse validTxnList + final ValidReadTxnList currentValidTxnList = new ValidReadTxnList(validTxnListStr); + // Get the valid write id list for the tables in current state + final List currentTblValidWriteIdsList = new ArrayList<>(); + for (String fullTableName : creationMetadata.getTablesUsed()) { + currentTblValidWriteIdsList.add(getValidWriteIdsForTable(dbConn, fullTableName, currentValidTxnList)); + } + final ValidTxnWriteIdList currentValidReaderWriteIdList = TxnCommonUtils.createValidTxnWriteIdList( + currentValidTxnList.getHighWatermark(), currentTblValidWriteIdsList); + List params = new ArrayList<>(); StringBuilder query = new StringBuilder(); // compose a query that select transactions containing an update... query.append("select ctc_update_delete from COMPLETED_TXN_COMPONENTS where ctc_update_delete='Y' AND ("); int i = 0; for (String fullyQualifiedName : creationMetadata.getTablesUsed()) { + ValidWriteIdList tblValidWriteIdList = + validReaderWriteIdList.getTableValidWriteIdList(fullyQualifiedName); + if (tblValidWriteIdList == null) { + LOG.warn("ValidWriteIdList for table {} not present in creation metadata, this should not happen", fullyQualifiedName); + return null; + } + + // First, we check whether the low watermark has moved for any of the tables. + // If it has, we return true, since it is not incrementally refreshable, e.g., + // one of the commits that are not available may be an update/delete. + ValidWriteIdList currentTblValidWriteIdList = + currentValidReaderWriteIdList.getTableValidWriteIdList(fullyQualifiedName); + if (currentTblValidWriteIdList == null) { + LOG.warn("Current ValidWriteIdList for table {} not present in creation metadata, this should not happen", fullyQualifiedName); + return null; + } + if (!Objects.equals(currentTblValidWriteIdList.getMinOpenWriteId(), tblValidWriteIdList.getMinOpenWriteId())) { + LOG.debug("Minimum open write id do not match for table {}", fullyQualifiedName); + return null; + } + // ...for each of the tables that are part of the materialized view, // where the transaction had to be committed after the materialization was created... if (i != 0) { @@ -2033,12 +2062,6 @@ public Materialization getMaterializationInvalidationInfo( query.append(" (ctc_database=? AND ctc_table=?"); params.add(names[0]); params.add(names[1]); - ValidWriteIdList tblValidWriteIdList = - validReaderWriteIdList.getTableValidWriteIdList(fullyQualifiedName); - if (tblValidWriteIdList == null) { - LOG.warn("ValidWriteIdList for table {} not present in creation metadata, this should not happen", fullyQualifiedName); - return null; - } query.append(" AND (ctc_writeid > " + tblValidWriteIdList.getHighWatermark()); query.append(tblValidWriteIdList.getInvalidWriteIds().length == 0 ? ") " : " OR ctc_writeid IN(" + StringUtils.join(",", @@ -2048,10 +2071,10 @@ public Materialization getMaterializationInvalidationInfo( } // ... and where the transaction has already been committed as per snapshot taken // when we are running current query - query.append(") AND ctc_txnid <= " + validTxnList.getHighWatermark()); - query.append(validTxnList.getInvalidTransactions().length == 0 ? " " : + query.append(") AND ctc_txnid <= " + currentValidTxnList.getHighWatermark()); + query.append(currentValidTxnList.getInvalidTransactions().length == 0 ? " " : " AND ctc_txnid NOT IN(" + StringUtils.join(",", - Arrays.asList(ArrayUtils.toObject(validTxnList.getInvalidTransactions()))) + ") "); + Arrays.asList(ArrayUtils.toObject(currentValidTxnList.getInvalidTransactions()))) + ") "); // Execute query String s = query.toString();