diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9fd7dcab4c..96451e56df 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3249,6 +3249,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "The implementation that we should use for the materialized views registry. \n" + " DEFAULT: Default cache for materialized views\n" + " DUMMY: Do not cache materialized views and hence forward requests to metastore"), + HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_REFRESH("hive.server2.materializedviews.registry.refresh.period", "60s", + new TimeValidator(TimeUnit.SECONDS), + "Period, specified in seconds, between successive refreshes of the registry to pull new materializations " + + "from the metastore that may have been created by other HS2 instances."), // HiveServer2 WebUI HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/DropMaterializedViewOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/DropMaterializedViewOperation.java index de09a55ca4..21b456aa45 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/DropMaterializedViewOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/DropMaterializedViewOperation.java @@ -54,7 +54,7 @@ public int execute() throws HiveException { // TODO: API w/catalog name context.getDb().dropTable(desc.getTableName(), false); - HiveMaterializedViewsRegistry.get().dropMaterializedView(table); + HiveMaterializedViewsRegistry.get().dropMaterializedView(table.getDbName(), table.getTableName()); DDLUtils.addIfAbsentByName(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK), context); return 0; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/MaterializedViewUpdateOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/MaterializedViewUpdateOperation.java index ad6e163a4f..59861ed62c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/MaterializedViewUpdateOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/MaterializedViewUpdateOperation.java @@ -59,6 +59,8 @@ public int execute() throws HiveException { mvTable.getDbName(), mvTable.getTableName(), ImmutableSet.copyOf(mvTable.getCreationMetadata().getTablesUsed())); cm.setValidTxnList(context.getConf().get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); + mvTable.setCreationMetadata(cm); + HiveMaterializedViewsRegistry.get().createMaterializedView(context.getDb().getConf(), mvTable); context.getDb().updateCreationMetadata(mvTable.getDbName(), mvTable.getTableName(), cm); } } catch (HiveException e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 0028818d74..611d14f70c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1559,42 +1559,206 @@ public Table apply(org.apache.hadoop.hive.metastore.api.Table table) { /** * Get the materialized views that have been enabled for rewriting from the - * metastore. If the materialized view is in the cache, we do not need to - * parse it to generate a logical plan for the rewriting. Instead, we - * return the version present in the cache. Further, information provided - * by the invalidation cache is useful to know whether a materialized view - * can be used for rewriting or not. + * cache (registry). It will preprocess them to discard those that are + * outdated and augment those that need to be augmented, e.g., if incremental + * rewriting is enabled. * - * @return the list of materialized views available for rewriting + * @return the list of materialized views available for rewriting from the registry * @throws HiveException */ - public List getAllValidMaterializedViews(List tablesUsed, boolean forceMVContentsUpToDate, - HiveTxnManager txnMgr) throws HiveException { + public List getPreprocessedMaterializedViewsFromRegistry( + List tablesUsed, HiveTxnManager txnMgr) throws HiveException { // Final result List result = new ArrayList<>(); - try { - // From metastore (for security) - List materializedViews = getAllMaterializedViewObjectsForRewriting(); + if (HiveMaterializedViewsRegistry.get().isInitialized()) { + // From cache + List materializedViews = + HiveMaterializedViewsRegistry.get().getRewritingMaterializedViews(); if (materializedViews.isEmpty()) { // Bail out: empty list return result; } - result.addAll(getValidMaterializedViews(materializedViews, - tablesUsed, forceMVContentsUpToDate, txnMgr)); + // Add to final result + result.addAll( + filterAugmentMaterializedViews(materializedViews, tablesUsed, txnMgr)); + } else { + // Otherwise the registry has not been initialized, skip for the time being + if (LOG.isWarnEnabled()) { + LOG.info("Materialized views were skipped because cache has not been loaded yet"); + } + } + return result; + } + + private List filterAugmentMaterializedViews(List materializedViews, + List tablesUsed, HiveTxnManager txnMgr) throws HiveException { + final String validTxnsList = conf.get(ValidTxnList.VALID_TXNS_KEY); + final ValidTxnWriteIdList currentTxnWriteIds = txnMgr.getValidWriteIds(tablesUsed, validTxnsList); + final boolean tryIncrementalRewriting = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_INCREMENTAL); + final long defaultTimeWindow = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW, + TimeUnit.MILLISECONDS); + try { + // Final result + List result = new ArrayList<>(); + for (RelOptMaterialization materialization : materializedViews) { + final RelNode viewScan = materialization.tableRel; + final Table materializedViewTable; + if (viewScan instanceof Project) { + // There is a Project on top (due to nullability) + materializedViewTable = ((RelOptHiveTable) viewScan.getInput(0).getTable()).getHiveTableMD(); + } else { + materializedViewTable = ((RelOptHiveTable) viewScan.getTable()).getHiveTableMD(); + } + final Boolean outdated = isOutdatedMaterializedView(materializedViewTable, currentTxnWriteIds, + defaultTimeWindow, tablesUsed, false); + if (outdated == null) { + continue; + } + + final CreationMetadata creationMetadata = materializedViewTable.getCreationMetadata(); + if (outdated) { + // The MV is outdated, see whether we should consider it for rewriting or not + if (!tryIncrementalRewriting) { + LOG.debug("Materialized view " + materializedViewTable.getFullyQualifiedName() + + " ignored for rewriting as its contents are outdated"); + continue; + } + // We will rewrite it to include the filters on transaction list + // so we can produce partial rewritings. + // This would be costly since we are doing it for every materialized view + // that is outdated, but it only happens for more than one materialized view + // if rewriting with outdated materialized views is enabled (currently + // disabled by default). + materialization = augmentMaterializationWithTimeInformation( + materialization, validTxnsList, new ValidTxnWriteIdList( + creationMetadata.getValidTxnList())); + } + result.add(materialization); + } return result; } catch (Exception e) { throw new HiveException(e); } } - public List getValidMaterializedView(String dbName, String materializedViewName, - List tablesUsed, boolean forceMVContentsUpToDate, HiveTxnManager txnMgr) throws HiveException { - return getValidMaterializedViews(ImmutableList.of(getTable(dbName, materializedViewName)), - tablesUsed, forceMVContentsUpToDate, txnMgr); + /** + * Validate that the materialized views retrieved from registry are still up-to-date. + * For those that are not, the method loads them from the metastore into the registry. + * + * @return true if they are up-to-date, otherwise false + * @throws HiveException + */ + public boolean validateMaterializedViewsFromRegistry(List
cachedMaterializedViewTables, + List tablesUsed, HiveTxnManager txnMgr) throws HiveException { + final long defaultTimeWindow = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW, + TimeUnit.MILLISECONDS); + final String validTxnsList = conf.get(ValidTxnList.VALID_TXNS_KEY); + final ValidTxnWriteIdList currentTxnWriteIds = txnMgr.getValidWriteIds(tablesUsed, validTxnsList); + try { + // Final result + boolean result = true; + for (Table cachedMaterializedViewTable : cachedMaterializedViewTables) { + // Retrieve the materialized view table from the metastore + final Table materializedViewTable = getTable( + cachedMaterializedViewTable.getDbName(), cachedMaterializedViewTable.getTableName()); + if (materializedViewTable == null || !materializedViewTable.isRewriteEnabled()) { + // This could happen if materialized view has been deleted or rewriting has been disabled. + // We remove it from the registry and set result to false. + HiveMaterializedViewsRegistry.get().dropMaterializedView(cachedMaterializedViewTable); + result = false; + } else { + final Boolean outdated = isOutdatedMaterializedView(cachedMaterializedViewTable, + currentTxnWriteIds, defaultTimeWindow, tablesUsed, true); + if (outdated == null) { + result = false; + continue; + } + // If the cached materialized view was not outdated wrt the query snapshot, + // then we know that the metastore version should be either the same or + // more recent. If it is more recent, snapshot isolation will shield us + // from the reading its contents after snapshot was acquired, but we will + // update the registry so we have most recent version. + // On the other hand, if the materialized view in the cache was outdated, + // we can only use it if the version that was in the cache is the same one + // that we can find in the metastore. + if (outdated) { + if (!cachedMaterializedViewTable.equals(materializedViewTable)) { + // We ignore and update the registry + HiveMaterializedViewsRegistry.get().refreshMaterializedView(conf, cachedMaterializedViewTable, materializedViewTable); + result = false; + } else { + // Obtain additional information if we should try incremental rewriting / rebuild + // We will not try partial rewriting if there were update/delete operations on source tables + Materialization invalidationInfo = getMSC().getMaterializationInvalidationInfo( + materializedViewTable.getCreationMetadata(), conf.get(ValidTxnList.VALID_TXNS_KEY)); + if (invalidationInfo == null || invalidationInfo.isSourceTablesUpdateDeleteModified()) { + // We ignore (as it did not meet the requirements), but we do not need to update it in the + // registry, since it is up-to-date + result = false; + } + } + } else if (!cachedMaterializedViewTable.equals(materializedViewTable)) { + // Update the registry + HiveMaterializedViewsRegistry.get().refreshMaterializedView(conf, cachedMaterializedViewTable, materializedViewTable); + } + } + } + return result; + } catch (Exception e) { + throw new HiveException(e); + } + } + + /** + * Get the materialized views that have been enabled for rewriting from the + * metastore. If the materialized view is in the cache, we do not need to + * parse it to generate a logical plan for the rewriting. Instead, we + * return the version present in the cache. Further, information provided + * by the invalidation cache is useful to know whether a materialized view + * can be used for rewriting or not. + * + * @return the list of materialized views available for rewriting + * @throws HiveException + */ + public List getPreprocessedMaterializedViews( + List tablesUsed, HiveTxnManager txnMgr) + throws HiveException { + // From metastore + List
materializedViewTables = + getAllMaterializedViewObjectsForRewriting(); + if (materializedViewTables.isEmpty()) { + // Bail out: empty list + return new ArrayList<>(); + } + // Return final result + return getValidMaterializedViews(materializedViewTables, tablesUsed, false, txnMgr); + } + + /** + * Get the target materialized view from the metastore. Although it may load the plan + * from the registry, it is guaranteed that it will always return an up-to-date version + * wrt metastore. + * + * @return the materialized view for rebuild + * @throws HiveException + */ + public RelOptMaterialization getMaterializedViewForRebuild(String dbName, String materializedViewName, + List tablesUsed, HiveTxnManager txnMgr) throws HiveException { + List validMaterializedViews = getValidMaterializedViews( + ImmutableList.of(getTable(dbName, materializedViewName)), tablesUsed, true, txnMgr); + if (validMaterializedViews.isEmpty()) { + return null; + } + assert validMaterializedViews.size() == 1; + return validMaterializedViews.get(0); } private List getValidMaterializedViews(List
materializedViewTables, - List tablesUsed, boolean forceMVContentsUpToDate, HiveTxnManager txnMgr) throws HiveException { + List tablesUsed, boolean forceMVContentsUpToDate, HiveTxnManager txnMgr) + throws HiveException { final String validTxnsList = conf.get(ValidTxnList.VALID_TXNS_KEY); final ValidTxnWriteIdList currentTxnWriteIds = txnMgr.getValidWriteIds(tablesUsed, validTxnsList); final boolean tryIncrementalRewriting = @@ -1617,7 +1781,7 @@ public Table apply(org.apache.hadoop.hive.metastore.api.Table table) { final CreationMetadata creationMetadata = materializedViewTable.getCreationMetadata(); if (outdated) { // The MV is outdated, see whether we should consider it for rewriting or not - boolean ignore = false; + boolean ignore; if (forceMVContentsUpToDate && !tryIncrementalRebuild) { // We will not try partial rewriting for rebuild if incremental rebuild is disabled ignore = true; @@ -1651,8 +1815,7 @@ public Table apply(org.apache.hadoop.hive.metastore.api.Table table) { } else { cachedMaterializedViewTable = (RelOptHiveTable) viewScan.getTable(); } - if (cachedMaterializedViewTable.getHiveTableMD().getCreateTime() == - materializedViewTable.getCreateTime()) { + if (cachedMaterializedViewTable.getHiveTableMD().equals(materializedViewTable)) { // It is in the cache and up to date if (outdated) { // We will rewrite it to include the filters on transaction list @@ -1674,9 +1837,10 @@ public Table apply(org.apache.hadoop.hive.metastore.api.Table table) { LOG.debug("Materialized view " + materializedViewTable.getFullyQualifiedName() + " was not in the cache"); } - materialization = HiveMaterializedViewsRegistry.get().createMaterializedView( + materialization = HiveMaterializedViewsRegistry.get().createMaterialization( conf, materializedViewTable); if (materialization != null) { + HiveMaterializedViewsRegistry.get().refreshMaterializedView(conf, null, materializedViewTable); if (outdated) { // We will rewrite it to include the filters on transaction list // so we can produce partial rewritings diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java index c3392586f1..e31de1505c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java @@ -25,12 +25,14 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.ImmutableMap; +import java.util.function.BiFunction; import org.apache.calcite.adapter.druid.DruidQuery; import org.apache.calcite.adapter.druid.DruidSchema; import org.apache.calcite.adapter.druid.DruidTable; @@ -40,6 +42,7 @@ import org.apache.calcite.plan.RelOptMaterialization; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -49,6 +52,7 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; @@ -138,11 +142,13 @@ public void init(Hive db) { LOG.info("Using dummy materialized views registry"); } else { // We initialize the cache - ExecutorService pool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("HiveMaterializedViewsRegistry-%d") - .build()); - pool.submit(new Loader(db)); - pool.shutdown(); + long period = HiveConf.getTimeVar(db.getConf(), ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_REFRESH, TimeUnit.SECONDS); + ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("HiveMaterializedViewsRegistry-%d") + .build()); + pool.scheduleAtFixedRate(new Loader(db), 0, period, TimeUnit.SECONDS); } } @@ -159,15 +165,36 @@ public void run() { SessionState ss = new SessionState(db.getConf()); ss.setIsHiveServerQuery(true); // All is served from HS2, we do not need e.g. Tez sessions SessionState.start(ss); - final boolean cache = !db.getConf() - .get(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname).equals("DUMMY"); - for (Table mv : db.getAllMaterializedViewObjectsForRewriting()) { - addMaterializedView(db.getConf(), mv, OpType.LOAD, cache); + if (initialized.get()) { + for (Table mvTable : db.getAllMaterializedViewObjectsForRewriting()) { + RelOptMaterialization existingMV = getRewritingMaterializedView(mvTable.getDbName(), mvTable.getTableName()); + if (existingMV != null) { + // We replace if the existing MV is not newer + Table existingMVTable = extractTable(existingMV); + if (existingMVTable.getCreateTime() < mvTable.getCreateTime() || + (existingMVTable.getCreateTime() == mvTable.getCreateTime() && + existingMVTable.getCreationMetadata().getMaterializationTime() <= mvTable.getCreationMetadata().getMaterializationTime())) { + refreshMaterializedView(db.getConf(), existingMVTable, mvTable); + } + } else { + // Simply replace if it still does not exist + refreshMaterializedView(db.getConf(), null, mvTable); + } + } + LOG.info("Materialized views registry has been refreshed"); + } else { + for (Table mvTable : db.getAllMaterializedViewObjectsForRewriting()) { + refreshMaterializedView(db.getConf(), null, mvTable); + } + initialized.set(true); + LOG.info("Materialized views registry has been initialized"); } - initialized.set(true); - LOG.info("Materialized views registry has been initialized"); } catch (HiveException e) { - LOG.error("Problem connecting to the metastore when initializing the view registry", e); + if (initialized.get()) { + LOG.error("Problem connecting to the metastore when refreshing the view registry", e); + } else { + LOG.error("Problem connecting to the metastore when initializing the view registry", e); + } } } } @@ -177,43 +204,9 @@ public boolean isInitialized() { } /** - * Adds a newly created materialized view to the cache. - * - * @param materializedViewTable the materialized view + * Parses and creates a materialization. */ - public RelOptMaterialization createMaterializedView(HiveConf conf, Table materializedViewTable) { - final boolean cache = !conf.get(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname) - .equals("DUMMY"); - return addMaterializedView(conf, materializedViewTable, OpType.CREATE, cache); - } - - /** - * Adds the materialized view to the cache. - * - * @param materializedViewTable the materialized view - */ - private RelOptMaterialization addMaterializedView(HiveConf conf, Table materializedViewTable, - OpType opType, boolean cache) { - // Bail out if it is not enabled for rewriting - if (!materializedViewTable.isRewriteEnabled()) { - LOG.debug("Materialized view " + materializedViewTable.getCompleteName() + - " ignored; it is not rewrite enabled"); - return null; - } - - // We are going to create the map for each view in the given database - ConcurrentMap cq = - new ConcurrentHashMap(); - if (cache) { - // If we are caching the MV, we include it in the cache - final ConcurrentMap prevCq = materializedViews.putIfAbsent( - materializedViewTable.getDbName(), cq); - if (prevCq != null) { - cq = prevCq; - } - } - - // Start the process to add MV to the cache + public RelOptMaterialization createMaterialization(HiveConf conf, Table materializedViewTable) { // First we parse the view query and create the materialization object final String viewQuery = materializedViewTable.getViewExpandedText(); final RelNode viewScan = createMaterializedViewScan(conf, materializedViewTable); @@ -231,36 +224,126 @@ private RelOptMaterialization addMaterializedView(HiveConf conf, Table materiali return null; } - RelOptMaterialization materialization = new RelOptMaterialization(viewScan, queryRel, + return new RelOptMaterialization(viewScan, queryRel, null, viewScan.getTable().getQualifiedName()); - if (opType == OpType.CREATE) { - // You store the materialized view - cq.put(materializedViewTable.getTableName(), materialization); - } else { - // For LOAD, you only add it if it does exist as you might be loading an outdated MV - cq.putIfAbsent(materializedViewTable.getTableName(), materialization); + } + + /** + * Adds a newly created materialized view to the cache. + */ + public void createMaterializedView(HiveConf conf, Table materializedViewTable) { + final boolean cache = !conf.get(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname) + .equals("DUMMY"); + if (!cache) { + // Nothing to do, bail out + return; + } + + // Bail out if it is not enabled for rewriting + if (!materializedViewTable.isRewriteEnabled()) { + LOG.debug("Materialized view " + materializedViewTable.getCompleteName() + + " ignored; it is not rewrite enabled"); + return; + } + + // We are going to create the map for each view in the given database + ConcurrentMap dbMap = + new ConcurrentHashMap(); + // If we are caching the MV, we include it in the cache + final ConcurrentMap prevDbMap = materializedViews.putIfAbsent( + materializedViewTable.getDbName(), dbMap); + if (prevDbMap != null) { + dbMap = prevDbMap; } + RelOptMaterialization materialization = createMaterialization(conf, materializedViewTable); + if (materialization == null) { + return; + } + // You store the materialized view + dbMap.put(materializedViewTable.getTableName(), materialization); + if (LOG.isDebugEnabled()) { - LOG.debug("Created materialized view for rewriting: " + viewScan.getTable().getQualifiedName()); + LOG.debug("Created materialized view for rewriting: " + materializedViewTable.getFullyQualifiedName()); } - return materialization; } /** - * Removes the materialized view from the cache. - * - * @param materializedViewTable the materialized view to remove + * Update the materialized view in the registry (if existing materialized view matches). + */ + public void refreshMaterializedView(HiveConf conf, Table oldMaterializedViewTable, Table materializedViewTable) { + final boolean cache = !conf.get(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname) + .equals("DUMMY"); + if (!cache) { + // Nothing to do, bail out + return; + } + + // Bail out if it is not enabled for rewriting + if (!materializedViewTable.isRewriteEnabled()) { + dropMaterializedView(oldMaterializedViewTable); + LOG.debug("Materialized view " + materializedViewTable.getCompleteName() + + " dropped; it is not rewrite enabled"); + return; + } + + // We are going to create the map for each view in the given database + ConcurrentMap dbMap = + new ConcurrentHashMap(); + // If we are caching the MV, we include it in the cache + final ConcurrentMap prevDbMap = materializedViews.putIfAbsent( + materializedViewTable.getDbName(), dbMap); + if (prevDbMap != null) { + dbMap = prevDbMap; + } + final RelOptMaterialization newMaterialization = createMaterialization(conf, materializedViewTable); + if (newMaterialization == null) { + return; + } + dbMap.compute(materializedViewTable.getTableName(), new BiFunction() { + @Override + public RelOptMaterialization apply(String tableName, RelOptMaterialization existingMaterialization) { + if (existingMaterialization == null) { + // If it was not existing, we just create it + return newMaterialization; + } + Table existingMaterializedViewTable = extractTable(existingMaterialization); + if (existingMaterializedViewTable.equals(oldMaterializedViewTable)) { + // If the old version is the same, we replace it + return newMaterialization; + } + // Otherwise, we return existing materialization + return existingMaterialization; + } + }); + + if (LOG.isDebugEnabled()) { + LOG.debug("Materialized view refreshed: " + materializedViewTable.getFullyQualifiedName()); + } + } + + /** + * Removes the materialized view from the cache (based on table object equality), if exists. */ public void dropMaterializedView(Table materializedViewTable) { - dropMaterializedView(materializedViewTable.getDbName(), materializedViewTable.getTableName()); + ConcurrentMap dbMap = materializedViews.get(materializedViewTable.getDbName()); + if (dbMap != null) { + // Delete only if the create time for the input materialized view table and the table + // in the map match. Otherwise, keep the one in the map. + dbMap.computeIfPresent(materializedViewTable.getTableName(), new BiFunction() { + @Override + public RelOptMaterialization apply(String tableName, RelOptMaterialization oldMaterialization) { + if (extractTable(oldMaterialization).equals(materializedViewTable)) { + return null; + } + return oldMaterialization; + } + }); + } } /** - * Removes the materialized view from the cache. - * - * @param dbName the db for the materialized view to remove - * @param tableName the name for the materialized view to remove + * Removes the materialized view from the cache (based on qualified name), if exists. */ public void dropMaterializedView(String dbName, String tableName) { ConcurrentMap dbMap = materializedViews.get(dbName); @@ -269,10 +352,20 @@ public void dropMaterializedView(String dbName, String tableName) { } } + /** + * Returns all the materialized views in the cache. + * + * @return the collection of materialized views, or the empty collection if none + */ + List getRewritingMaterializedViews() { + List result = new ArrayList<>(); + materializedViews.forEach((dbName, mvs) -> result.addAll(mvs.values())); + return result; + } + /** * Returns the materialized views in the cache for the given database. * - * @param dbName the database * @return the collection of materialized views, or the empty collection if none */ RelOptMaterialization getRewritingMaterializedView(String dbName, String viewName) { @@ -422,6 +515,17 @@ private static TableType obtainTableType(Table tabMetaData) { return TableType.NATIVE; } + private static Table extractTable(RelOptMaterialization materialization) { + RelOptHiveTable cachedMaterializedViewTable; + if (materialization.tableRel instanceof Project) { + // There is a Project on top (due to nullability) + cachedMaterializedViewTable = (RelOptHiveTable) materialization.tableRel.getInput(0).getTable(); + } else { + cachedMaterializedViewTable = (RelOptHiveTable) materialization.tableRel.getTable(); + } + return cachedMaterializedViewTable.getHiveTableMD(); + } + //@TODO this seems to be the same as org.apache.hadoop.hive.ql.parse.CalcitePlanner.TableType.DRUID do we really need both private enum TableType { DRUID, @@ -429,9 +533,4 @@ private static TableType obtainTableType(Table tabMetaData) { JDBC } - private enum OpType { - CREATE, //view just being created - LOAD // already created view being loaded - } - -} \ No newline at end of file +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 212d27a3bc..58c724e623 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -2242,7 +2242,10 @@ private RelNode applyMaterializedViewRewriting(RelOptPlanner planner, RelNode ba final RelOptCluster optCluster = basePlan.getCluster(); final PerfLogger perfLogger = SessionState.getPerfLogger(); + final boolean useMaterializedViewsRegistry = !conf.get(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname) + .equals("DUMMY"); final RelNode calcitePreMVRewritingPlan = basePlan; + final List tablesUsedQuery = getTablesUsed(basePlan); final boolean mvRebuild = mvRebuildMode != MaterializationRebuildMode.NONE; // Add views to planner @@ -2252,13 +2255,21 @@ private RelNode applyMaterializedViewRewriting(RelOptPlanner planner, RelNode ba // We only retrieve the materialization corresponding to the rebuild. In turn, // we pass 'true' for the forceMVContentsUpToDate parameter, as we cannot allow the // materialization contents to be stale for a rebuild if we want to use it. - materializations = db.getValidMaterializedView(mvRebuildDbName, mvRebuildName, - getTablesUsed(basePlan), true, getTxnMgr()); + RelOptMaterialization materialization = db.getMaterializedViewForRebuild( + mvRebuildDbName, mvRebuildName, tablesUsedQuery, getTxnMgr()); + if (materialization != null) { + materializations.add(materialization); + } } else { - // This is not a rebuild, we retrieve all the materializations. In turn, we do not need - // to force the materialization contents to be up-to-date, as this is not a rebuild, and - // we apply the user parameters (HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW) instead. - materializations = db.getAllValidMaterializedViews(getTablesUsed(basePlan), false, getTxnMgr()); + // This is not a rebuild, we retrieve all the materializations. + // In turn, we do not need to force the materialization contents to be up-to-date, + // as this is not a rebuild, and we apply the user parameters + // (HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW) instead. + if (useMaterializedViewsRegistry) { + materializations = db.getPreprocessedMaterializedViewsFromRegistry(tablesUsedQuery, getTxnMgr()); + } else { + materializations = db.getPreprocessedMaterializedViews(tablesUsedQuery, getTxnMgr()); + } } // We need to use the current cluster for the scan operator on views, // otherwise the planner will throw an Exception (different planners) @@ -2355,24 +2366,36 @@ private RelNode copyNodeScan(RelNode scan) { if (!RelOptUtil.toString(calcitePreMVRewritingPlan).equals(RelOptUtil.toString(basePlan))) { // A rewriting was produced, we will check whether it was part of an incremental rebuild // to try to replace INSERT OVERWRITE by INSERT or MERGE - if (mvRebuildMode == MaterializationRebuildMode.INSERT_OVERWRITE_REBUILD && - HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REBUILD_INCREMENTAL)) { - // First we need to check if it is valid to convert to MERGE/INSERT INTO. - // If we succeed, we modify the plan and afterwards the AST. - // MV should be an acid table. - MaterializedViewRewritingRelVisitor visitor = new MaterializedViewRewritingRelVisitor(); - visitor.go(basePlan); - if (visitor.isRewritingAllowed()) { - // Trigger rewriting to remove UNION branch with MV - if (visitor.isContainsAggregate()) { - basePlan = hepPlan(basePlan, false, mdProvider, null, - HepMatchOrder.TOP_DOWN, HiveAggregateIncrementalRewritingRule.INSTANCE); - mvRebuildMode = MaterializationRebuildMode.AGGREGATE_REBUILD; - } else { - basePlan = hepPlan(basePlan, false, mdProvider, null, - HepMatchOrder.TOP_DOWN, HiveNoAggregateIncrementalRewritingRule.INSTANCE); - mvRebuildMode = MaterializationRebuildMode.NO_AGGREGATE_REBUILD; + if (mvRebuildMode == MaterializationRebuildMode.INSERT_OVERWRITE_REBUILD) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REBUILD_INCREMENTAL)) { + // First we need to check if it is valid to convert to MERGE/INSERT INTO. + // If we succeed, we modify the plan and afterwards the AST. + // MV should be an acid table. + MaterializedViewRewritingRelVisitor visitor = new MaterializedViewRewritingRelVisitor(); + visitor.go(basePlan); + if (visitor.isRewritingAllowed()) { + // Trigger rewriting to remove UNION branch with MV + if (visitor.isContainsAggregate()) { + basePlan = hepPlan(basePlan, false, mdProvider, null, + HepMatchOrder.TOP_DOWN, HiveAggregateIncrementalRewritingRule.INSTANCE); + mvRebuildMode = MaterializationRebuildMode.AGGREGATE_REBUILD; + } else { + basePlan = hepPlan(basePlan, false, mdProvider, null, + HepMatchOrder.TOP_DOWN, HiveNoAggregateIncrementalRewritingRule.INSTANCE); + mvRebuildMode = MaterializationRebuildMode.NO_AGGREGATE_REBUILD; + } + } + } + } else if (useMaterializedViewsRegistry) { + // Before proceeding we need to check whether materialized views used are up-to-date + // wrt information in metastore + try { + if (!db.validateMaterializedViewsFromRegistry(getMaterializedViewsUsed(basePlan), tablesUsedQuery, getTxnMgr())) { + return calcitePreMVRewritingPlan; } + } catch (HiveException e) { + LOG.warn("Exception validating materialized views", e); + return calcitePreMVRewritingPlan; } } // Now we trigger some needed optimization rules again @@ -2415,6 +2438,24 @@ public void visit(RelNode node, int ordinal, RelNode parent) { return tablesUsed; } + private List
getMaterializedViewsUsed(RelNode plan) { + List
materializedViewsUsed = new ArrayList<>(); + new RelVisitor() { + @Override + public void visit(RelNode node, int ordinal, RelNode parent) { + if (node instanceof TableScan) { + TableScan ts = (TableScan) node; + Table table = ((RelOptHiveTable) ts.getTable()).getHiveTableMD(); + if (table.isMaterializedView()) { + materializedViewsUsed.add(table); + } + } + super.visit(node, ordinal, parent); + } + }.go(plan); + return materializedViewsUsed; + } + /** * Run the HEP Planner with the given rule set. * diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 99ce46e9a2..06c0132250 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -150,6 +150,7 @@ import org.apache.hadoop.hive.ql.ddl.view.AlterMaterializedViewRewriteDesc; import org.apache.hadoop.hive.ql.ddl.view.DropMaterializedViewDesc; import org.apache.hadoop.hive.ql.ddl.view.DropViewDesc; +import org.apache.hadoop.hive.ql.ddl.view.MaterializedViewUpdateDesc; import org.apache.hadoop.hive.ql.ddl.workloadmanagement.AlterPoolAddTriggerDesc; import org.apache.hadoop.hive.ql.ddl.workloadmanagement.AlterPoolDropTriggerDesc; import org.apache.hadoop.hive.ql.ddl.workloadmanagement.AlterResourcePlanDesc; @@ -4371,7 +4372,22 @@ private void analyzeAlterMaterializedViewRewrite(String fqMvName, ASTNode ast) t inputs.add(new ReadEntity(materializedViewTable)); outputs.add(new WriteEntity(materializedViewTable, WriteEntity.WriteType.DDL_EXCLUSIVE)); - rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterMVRewriteDesc))); + + // Create task for alterMVRewriteDesc + DDLWork work = new DDLWork(getInputs(), getOutputs(), alterMVRewriteDesc); + Task targetTask = TaskFactory.get(work); + + // Create task to update rewrite flag as dependant of previous one + String tableName = alterMVRewriteDesc.getMaterializedViewName(); + boolean retrieveAndInclude = alterMVRewriteDesc.isRewriteEnable(); + boolean disableRewrite = !alterMVRewriteDesc.isRewriteEnable(); + MaterializedViewUpdateDesc materializedViewUpdateDesc = + new MaterializedViewUpdateDesc(tableName, retrieveAndInclude, disableRewrite, false); + DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), materializedViewUpdateDesc); + targetTask.addDependentTask(TaskFactory.get(ddlWork, conf)); + + // Add root task + rootTasks.add(targetTask); } } diff --git a/ql/src/test/queries/clientpositive/materialized_view_partitioned_2.q b/ql/src/test/queries/clientpositive/materialized_view_partitioned_2.q index a3f437ed93..f617eaff57 100644 --- a/ql/src/test/queries/clientpositive/materialized_view_partitioned_2.q +++ b/ql/src/test/queries/clientpositive/materialized_view_partitioned_2.q @@ -32,8 +32,8 @@ SELECT * FROM src_txn_2 where value > 'val_220' and value < 'val_230'; SELECT * FROM src_txn_2 where value > 'val_220' and value < 'val_230'; --- SHOULD CHOOSE partition_mv_4 SINCE IT IS THE MOST EFFICIENT --- READING ONLY ONE PARTITION +-- SHOULD CHOOSE partition_mv_1, partition_mv_3 OR partition_mv_4 +-- SINCE IT IS THE MOST EFFICIENT READING ONLY ONE PARTITION EXPLAIN SELECT * FROM src_txn_2 where key > 224 and key < 226; diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite.q.out index 53f723387e..93a1103465 100644 --- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite.q.out +++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite.q.out @@ -272,6 +272,7 @@ POSTHOOK: Input: default@cmv_mat_view2_n4 POSTHOOK: Output: default@cmv_mat_view2_n4 STAGE DEPENDENCIES: Stage-0 is a root stage + Stage-1 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-0 @@ -279,6 +280,11 @@ STAGE PLANS: name: default.cmv_mat_view2_n4 enable: true + Stage: Stage-1 + Materialized View Update + name: default.cmv_mat_view2_n4 + retrieveAndInclude: true + PREHOOK: query: alter materialized view cmv_mat_view2_n4 enable rewrite PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE PREHOOK: Input: default@cmv_mat_view2_n4 diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out index 77bc231b11..1d6ac62429 100644 --- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out +++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out @@ -467,6 +467,7 @@ POSTHOOK: Input: default@cmv_mat_view_n5 POSTHOOK: Output: default@cmv_mat_view_n5 STAGE DEPENDENCIES: Stage-0 is a root stage + Stage-1 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-0 @@ -474,6 +475,11 @@ STAGE PLANS: name: default.cmv_mat_view_n5 enable: true + Stage: Stage-1 + Materialized View Update + name: default.cmv_mat_view_n5 + retrieveAndInclude: true + PREHOOK: query: ALTER MATERIALIZED VIEW cmv_mat_view_n5 ENABLE REWRITE PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE PREHOOK: Input: default@cmv_mat_view_n5 diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_dummy.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_dummy.q.out index 25c89a546c..62b74da7b2 100644 --- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_dummy.q.out +++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_dummy.q.out @@ -147,6 +147,7 @@ POSTHOOK: Input: default@cmv_mat_view2 POSTHOOK: Output: default@cmv_mat_view2 STAGE DEPENDENCIES: Stage-0 is a root stage + Stage-1 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-0 @@ -154,6 +155,11 @@ STAGE PLANS: name: default.cmv_mat_view2 disable: true + Stage: Stage-1 + Materialized View Update + name: default.cmv_mat_view2 + disableRewrite: true + PREHOOK: query: alter materialized view cmv_mat_view2 disable rewrite PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE PREHOOK: Input: default@cmv_mat_view2 @@ -291,6 +297,7 @@ POSTHOOK: Input: default@cmv_mat_view2 POSTHOOK: Output: default@cmv_mat_view2 STAGE DEPENDENCIES: Stage-0 is a root stage + Stage-1 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-0 @@ -298,6 +305,11 @@ STAGE PLANS: name: default.cmv_mat_view2 enable: true + Stage: Stage-1 + Materialized View Update + name: default.cmv_mat_view2 + retrieveAndInclude: true + PREHOOK: query: alter materialized view cmv_mat_view2 enable rewrite PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE PREHOOK: Input: default@cmv_mat_view2 diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_time_window.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_time_window.q.out index 7da22c0616..5f7c4f6b3f 100644 --- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_time_window.q.out +++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_time_window.q.out @@ -467,6 +467,7 @@ POSTHOOK: Input: default@cmv_mat_view_n3 POSTHOOK: Output: default@cmv_mat_view_n3 STAGE DEPENDENCIES: Stage-0 is a root stage + Stage-1 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-0 @@ -474,6 +475,11 @@ STAGE PLANS: name: default.cmv_mat_view_n3 enable: true + Stage: Stage-1 + Materialized View Update + name: default.cmv_mat_view_n3 + retrieveAndInclude: true + PREHOOK: query: ALTER MATERIALIZED VIEW cmv_mat_view_n3 ENABLE REWRITE PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE PREHOOK: Input: default@cmv_mat_view_n3 diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_partitioned_2.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_partitioned_2.q.out index d9c76fbf06..6726c1e9d0 100644 --- a/ql/src/test/results/clientpositive/llap/materialized_view_partitioned_2.q.out +++ b/ql/src/test/results/clientpositive/llap/materialized_view_partitioned_2.q.out @@ -525,13 +525,13 @@ POSTHOOK: Input: default@src_txn_2 PREHOOK: query: EXPLAIN SELECT * FROM src_txn_2 where key > 224 and key < 226 PREHOOK: type: QUERY -PREHOOK: Input: default@partition_mv_4 +PREHOOK: Input: default@partition_mv_1 PREHOOK: Input: default@src_txn_2 #### A masked pattern was here #### POSTHOOK: query: EXPLAIN SELECT * FROM src_txn_2 where key > 224 and key < 226 POSTHOOK: type: QUERY -POSTHOOK: Input: default@partition_mv_4 +POSTHOOK: Input: default@partition_mv_1 POSTHOOK: Input: default@src_txn_2 #### A masked pattern was here #### STAGE DEPENDENCIES: @@ -543,7 +543,7 @@ STAGE PLANS: limit: -1 Processor Tree: TableScan - alias: default.partition_mv_4 + alias: default.partition_mv_1 filterExpr: ((UDFToDouble(key) > 224.0D) and (UDFToDouble(key) < 226.0D)) (type: boolean) Filter Operator predicate: ((UDFToDouble(key) > 224.0D) and (UDFToDouble(key) < 226.0D)) (type: boolean) @@ -554,14 +554,14 @@ STAGE PLANS: PREHOOK: query: SELECT * FROM src_txn_2 where key > 223 and key < 225 PREHOOK: type: QUERY -PREHOOK: Input: default@partition_mv_4 -PREHOOK: Input: default@partition_mv_4@key=224 +PREHOOK: Input: default@partition_mv_1 +PREHOOK: Input: default@partition_mv_1@key=224 PREHOOK: Input: default@src_txn_2 #### A masked pattern was here #### POSTHOOK: query: SELECT * FROM src_txn_2 where key > 223 and key < 225 POSTHOOK: type: QUERY -POSTHOOK: Input: default@partition_mv_4 -POSTHOOK: Input: default@partition_mv_4@key=224 +POSTHOOK: Input: default@partition_mv_1 +POSTHOOK: Input: default@partition_mv_1@key=224 POSTHOOK: Input: default@src_txn_2 #### A masked pattern was here #### 224 val_224