diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index c71c540401..3ed2cf398a 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -555,6 +555,7 @@ minillaplocal.query.files=\ materialized_view_describe.q,\ materialized_view_drop.q,\ materialized_view_rebuild.q,\ + materialized_view_rewrite_empty.q,\ materialized_view_rewrite_1.q,\ materialized_view_rewrite_2.q,\ materialized_view_rewrite_3.q,\ diff --git a/ql/src/test/queries/clientpositive/materialized_view_rewrite_empty.q b/ql/src/test/queries/clientpositive/materialized_view_rewrite_empty.q new file mode 100644 index 0000000000..e5daa8dc78 --- /dev/null +++ b/ql/src/test/queries/clientpositive/materialized_view_rewrite_empty.q @@ -0,0 +1,28 @@ +-- SORT_QUERY_RESULTS + +set hive.vectorized.execution.enabled=false; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.strict.checks.cartesian.product=false; +set hive.stats.fetch.column.stats=true; +set hive.materializedview.rewriting=true; + +create table emps_mv_rewrite_empty ( + empid int, + deptno int, + name varchar(256), + salary float, + commission int) +stored as orc TBLPROPERTIES ('transactional'='true'); +analyze table emps_mv_rewrite_empty compute statistics for columns; + +create materialized view emps_mv_rewrite_empty_mv1 as +select * from emps_mv_rewrite_empty where empid < 150; + +explain +select * from emps_mv_rewrite_empty where empid < 120; + +select * from emps_mv_rewrite_empty where empid < 120; + +drop materialized view emps_mv_rewrite_empty_mv1; +drop table emps_mv_rewrite_empty; diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_rewrite_empty.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_rewrite_empty.q.out new file mode 100644 index 0000000000..b33d8c3f2d --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/materialized_view_rewrite_empty.q.out @@ -0,0 +1,89 @@ +PREHOOK: query: create table emps_mv_rewrite_empty ( + empid int, + deptno int, + name varchar(256), + salary float, + commission int) +stored as orc TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@emps_mv_rewrite_empty +POSTHOOK: query: create table emps_mv_rewrite_empty ( + empid int, + deptno int, + name varchar(256), + salary float, + commission int) +stored as orc TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@emps_mv_rewrite_empty +PREHOOK: query: analyze table emps_mv_rewrite_empty compute statistics for columns +PREHOOK: type: ANALYZE_TABLE +PREHOOK: Input: default@emps_mv_rewrite_empty +PREHOOK: Output: default@emps_mv_rewrite_empty +#### A masked pattern was here #### +POSTHOOK: query: analyze table emps_mv_rewrite_empty compute statistics for columns +POSTHOOK: type: ANALYZE_TABLE +POSTHOOK: Input: default@emps_mv_rewrite_empty +POSTHOOK: Output: default@emps_mv_rewrite_empty +#### A masked pattern was here #### +PREHOOK: query: create materialized view emps_mv_rewrite_empty_mv1 as +select * from emps_mv_rewrite_empty where empid < 150 +PREHOOK: type: CREATE_MATERIALIZED_VIEW +PREHOOK: Input: default@emps_mv_rewrite_empty +PREHOOK: Output: database:default +PREHOOK: Output: default@emps_mv_rewrite_empty_mv1 +POSTHOOK: query: create materialized view emps_mv_rewrite_empty_mv1 as +select * from emps_mv_rewrite_empty where empid < 150 +POSTHOOK: type: CREATE_MATERIALIZED_VIEW +POSTHOOK: Input: default@emps_mv_rewrite_empty +POSTHOOK: Output: database:default +POSTHOOK: Output: default@emps_mv_rewrite_empty_mv1 +PREHOOK: query: explain +select * from emps_mv_rewrite_empty where empid < 120 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select * from emps_mv_rewrite_empty where empid < 120 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: emps_mv_rewrite_empty + Filter Operator + predicate: (empid < 120) (type: boolean) + Select Operator + expressions: empid (type: int), deptno (type: int), name (type: varchar(256)), salary (type: float), commission (type: int) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + ListSink + +PREHOOK: query: select * from emps_mv_rewrite_empty where empid < 120 +PREHOOK: type: QUERY +PREHOOK: Input: default@emps_mv_rewrite_empty +#### A masked pattern was here #### +POSTHOOK: query: select * from emps_mv_rewrite_empty where empid < 120 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@emps_mv_rewrite_empty +#### A masked pattern was here #### +PREHOOK: query: drop materialized view emps_mv_rewrite_empty_mv1 +PREHOOK: type: DROP_MATERIALIZED_VIEW +PREHOOK: Input: default@emps_mv_rewrite_empty_mv1 +PREHOOK: Output: default@emps_mv_rewrite_empty_mv1 +POSTHOOK: query: drop materialized view emps_mv_rewrite_empty_mv1 +POSTHOOK: type: DROP_MATERIALIZED_VIEW +POSTHOOK: Input: default@emps_mv_rewrite_empty_mv1 +POSTHOOK: Output: default@emps_mv_rewrite_empty_mv1 +PREHOOK: query: drop table emps_mv_rewrite_empty +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@emps_mv_rewrite_empty +PREHOOK: Output: default@emps_mv_rewrite_empty +POSTHOOK: query: drop table emps_mv_rewrite_empty +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@emps_mv_rewrite_empty +POSTHOOK: Output: default@emps_mv_rewrite_empty diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java index 99c5abcf59..fc644f0b63 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java @@ -360,6 +360,15 @@ private void enrichWithInvalidationInfo(Materialization materialization) { final ConcurrentSkipListMap usedTableModifications = tableModifications.get(qNameTableUsed); + if (usedTableModifications == null) { + // This is not necessarily an error, since the table may be empty. To be safe, + // instead of including this materialized view, we just log the information and + // skip it (if table is really empty, it will not matter for performance anyway). + LOG.warn("No information found in invalidation cache for table {}, possible tables are: {}", + qNameTableUsed, tableModifications.keySet()); + materialization.setInvalidationTime(Long.MIN_VALUE); + return; + } final ConcurrentSkipListSet usedUDTableModifications = updateDeleteTableModifications.get(qNameTableUsed); final Entry tn = usedTableModifications.higherEntry(tableMaterializationTxnList.getHighWatermark()); diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index d53279ed63..361ede54ef 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -838,11 +838,7 @@ public void commitTxn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { MaterializationsRebuildLockHandler materializationsRebuildLockHandler = MaterializationsRebuildLockHandler.get(); - String fullyQualifiedName = null; - String dbName = null; - String tblName = null; - long writeId = 0L; - long timestamp = 0L; + List txnComponents = new ArrayList<>(); boolean isUpdateDelete = false; long txnid = rqst.getTxnid(); long sourceTxnId = -1; @@ -1007,12 +1003,10 @@ public void commitTxn(CommitTxnRequest rqst) s = "select ctc_database, ctc_table, ctc_writeid, ctc_timestamp from COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid; LOG.debug("Going to extract table modification information for invalidation cache <" + s + ">"); rs = stmt.executeQuery(s); - if (rs.next()) { - dbName = rs.getString(1); - tblName = rs.getString(2); - fullyQualifiedName = Warehouse.getQualifiedName(dbName, tblName); - writeId = rs.getLong(3); - timestamp = rs.getTimestamp(4, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime(); + while (rs.next()) { + // We only enter in this loop if the transaction actually affected any table + txnComponents.add(new TransactionRegistryInfo(rs.getString(1), rs.getString(2), + rs.getLong(3), rs.getTimestamp(4, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime())); } s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; LOG.debug("Going to execute update <" + s + ">"); @@ -1042,18 +1036,22 @@ public void commitTxn(CommitTxnRequest rqst) MaterializationsInvalidationCache materializationsInvalidationCache = MaterializationsInvalidationCache.get(); - if (materializationsInvalidationCache.containsMaterialization(dbName, tblName) && - !materializationsRebuildLockHandler.readyToCommitResource(dbName, tblName, txnid)) { - throw new MetaException( - "Another process is rebuilding the materialized view " + fullyQualifiedName); + for (TransactionRegistryInfo info : txnComponents) { + if (materializationsInvalidationCache.containsMaterialization(info.dbName, info.tblName) && + !materializationsRebuildLockHandler.readyToCommitResource(info.dbName, info.tblName, txnid)) { + throw new MetaException( + "Another process is rebuilding the materialized view " + info.fullyQualifiedName); + } } LOG.debug("Going to commit"); close(rs); dbConn.commit(); // Update registry with modifications - materializationsInvalidationCache.notifyTableModification( - dbName, tblName, writeId, timestamp, isUpdateDelete); + for (TransactionRegistryInfo info : txnComponents) { + materializationsInvalidationCache.notifyTableModification( + info.dbName, info.tblName, info.writeId, info.timestamp, isUpdateDelete); + } } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); @@ -1064,8 +1062,8 @@ public void commitTxn(CommitTxnRequest rqst) close(commitIdRs); close(lockHandle, stmt, dbConn); unlockInternal(); - if (fullyQualifiedName != null) { - materializationsRebuildLockHandler.unlockResource(dbName, tblName, txnid); + for (TransactionRegistryInfo info : txnComponents) { + materializationsRebuildLockHandler.unlockResource(info.dbName, info.tblName, txnid); } } } catch (RetryException e) { @@ -4783,4 +4781,21 @@ public boolean isWrapperFor(Class iface) throws SQLException { throw new UnsupportedOperationException(); } }; + + private class TransactionRegistryInfo { + final String dbName; + final String tblName; + final String fullyQualifiedName; + final long writeId; + final long timestamp; + + public TransactionRegistryInfo (String dbName, String tblName, long writeId, long timestamp) { + this.dbName = dbName; + this.tblName = tblName; + this.fullyQualifiedName = Warehouse.getQualifiedName(dbName, tblName); + this.writeId = writeId; + this.timestamp = timestamp; + } + } + }