diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java index 1f304782ff..d1f36945fb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.Explain.Level; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +78,10 @@ private List distributeColNames; // only used for materialized views private List distributeCols; // only used for materialized views private ReplicationSpec replicationSpec = null; + private Long initialMmWriteId; // Initial MM write ID for CMV and import. + // The FSOP configuration for the FSOP that is going to write initial data during cmv. + // This is not needed beyond compilation, so it is transient. + private transient FileSinkDesc writer; private String ownerName = null; /** @@ -458,6 +463,24 @@ public Table toTable(HiveConf conf) throws HiveException { return tbl; } + public void setInitialMmWriteId(Long mmWriteId) { + this.initialMmWriteId = mmWriteId; + } + + public Long getInitialMmWriteId() { + return initialMmWriteId; + } + + public FileSinkDesc getAndUnsetWriter() { + FileSinkDesc fsd = writer; + writer = null; + return fsd; + } + + public void setWriter(FileSinkDesc writer) { + this.writer = writer; + } + public void setOwnerName(String ownerName) { this.ownerName = ownerName; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 30d37914d0..2257cc14d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7296,7 +7296,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) LoadTableDesc ltd = null; ListBucketingCtx lbCtx = null; Map partSpec = null; - boolean isMmTable = false, isMmCtas = false; + boolean isMmTable = false, isMmCreate = false; Long writeId = null; HiveTxnManager txnMgr = getTxnMgr(); @@ -7568,6 +7568,9 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) List fileSinkColInfos = null; List sortColInfos = null; List distributeColInfos = null; + String dbName = null; + String tableName = null; + Map tblProps = null; CreateTableDesc tblDesc = qb.getTableDesc(); CreateViewDesc viewDesc = qb.getViewDesc(); if (tblDesc != null) { @@ -7577,31 +7580,16 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) fileSinkColInfos = new ArrayList<>(); destTableIsTemporary = tblDesc.isTemporary(); destTableIsMaterialization = tblDesc.isMaterialization(); - if (AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps())) { - try { - if (ctx.getExplainConfig() != null) { - writeId = 0L; // For explain plan, txn won't be opened and doesn't make sense to allocate write id - } else { - String dbName = tblDesc.getDatabaseName(); - String tableName = tblDesc.getTableName(); - - // CreateTableDesc stores table name as db.table. So, need to decode it before allocating - // write id. - if (tableName.contains(".")) { - String[] names = Utilities.getDbTableName(tableName); - dbName = names[0]; - tableName = names[1]; - } - writeId = txnMgr.getTableWriteId(dbName, tableName); - } - } catch (LockException ex) { - throw new SemanticException("Failed to allocate write Id", ex); - } - if (AcidUtils.isInsertOnlyTable(tblDesc.getTblProps(), true)) { - isMmTable = isMmCtas = true; - tblDesc.setInitialMmWriteId(writeId); - } - } + dbName = tblDesc.getDatabaseName(); + tableName = tblDesc.getTableName(); + // CreateTableDesc stores table name as db.table. So, need to decode it before allocating + // write id. + if (tableName.contains(".")) { + String[] names = Utilities.getDbTableName(tableName); + dbName = names[0]; + tableName = names[1]; + } + tblProps = tblDesc.getTblProps(); } else if (viewDesc != null) { fieldSchemas = new ArrayList<>(); partitionColumns = new ArrayList<>(); @@ -7615,6 +7603,30 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) distributeColInfos = new ArrayList<>(); destTableIsTemporary = false; destTableIsMaterialization = false; + String[] names = Utilities.getDbTableName(viewDesc.getViewName()); + dbName = names[0]; + tableName = names[1]; + tblProps = viewDesc.getTblProps(); + } + + if (tblProps != null && AcidUtils.isTablePropertyTransactional(tblProps)) { + try { + if (ctx.getExplainConfig() != null) { + writeId = 0L; // For explain plan, txn won't be opened and doesn't make sense to allocate write id + } else { + writeId = txnMgr.getTableWriteId(dbName, tableName); + } + } catch (LockException ex) { + throw new SemanticException("Failed to allocate write Id", ex); + } + if (AcidUtils.isInsertOnlyTable(tblProps, true)) { + isMmTable = isMmCreate = true; + if (tblDesc != null) { + tblDesc.setInitialMmWriteId(writeId); + } else { + viewDesc.setInitialMmWriteId(writeId); + } + } } if (isLocal) { @@ -7797,7 +7809,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) colTypes, destTableIsFullAcid ?//there is a change here - prev version had 'transactional', one before 'acid' Operation.INSERT : Operation.NOT_ACID, - isMmCtas)); + isMmCreate)); if (!outputs.add(new WriteEntity(destinationPath, !isDfsDir, isDestTempFile))) { throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES .getMsg(destinationPath.toUri().toString())); @@ -7855,10 +7867,14 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, tableDescriptor, destinationPartition, destinationPath, currentTableId, destTableIsFullAcid, destTableIsTemporary,//this was 1/4 acid destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS, - canBeMerged, destinationTable, writeId, isMmCtas, destType, qb); - if (isMmCtas) { + canBeMerged, destinationTable, writeId, isMmCreate, destType, qb); + if (isMmCreate) { // Add FSD so that the LoadTask compilation could fix up its path to avoid the move. - tableDesc.setWriter(fileSinkDesc); + if (tableDesc != null) { + tableDesc.setWriter(fileSinkDesc); + } else { + createVwDesc.setWriter(fileSinkDesc); + } } if (fileSinkDesc.getInsertOverwrite()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index cdd212cd34..ec46280627 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -446,22 +446,25 @@ private String extractTableFullName(StatsTask tsk) throws SemanticException { private void setLoadFileLocation( final ParseContext pCtx, LoadFileDesc lfd) throws SemanticException { // CTAS; make the movetask's destination directory the table's destination. - Long txnIdForCtas = null; - int stmtId = 0; // CTAS cannot be part of multi-txn stmt - FileSinkDesc dataSinkForCtas = null; + Long txnId = null; + int stmtId = 0; // CTAS or CMV cannot be part of multi-txn stmt + FileSinkDesc dataSink = null; String loc = null; if (pCtx.getQueryProperties().isCTAS()) { CreateTableDesc ctd = pCtx.getCreateTable(); - dataSinkForCtas = ctd.getAndUnsetWriter(); - txnIdForCtas = ctd.getInitialMmWriteId(); + dataSink = ctd.getAndUnsetWriter(); + txnId = ctd.getInitialMmWriteId(); loc = ctd.getLocation(); } else { - loc = pCtx.getCreateViewDesc().getLocation(); + CreateViewDesc cmv = pCtx.getCreateViewDesc(); + dataSink = cmv.getAndUnsetWriter(); + txnId = cmv.getInitialMmWriteId(); + loc = cmv.getLocation(); } Path location = (loc == null) ? getDefaultCtasLocation(pCtx) : new Path(loc); - if (txnIdForCtas != null) { - dataSinkForCtas.setDirName(location); - location = new Path(location, AcidUtils.deltaSubdir(txnIdForCtas, txnIdForCtas, stmtId)); + if (txnId != null) { + dataSink.setDirName(location); + location = new Path(location, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); lfd.setSourcePath(location); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("Setting MM CTAS to " + location); diff --git a/ql/src/test/queries/clientpositive/materialized_view_create.q b/ql/src/test/queries/clientpositive/materialized_view_create.q index c65cde5918..77cc60fd5d 100644 --- a/ql/src/test/queries/clientpositive/materialized_view_create.q +++ b/ql/src/test/queries/clientpositive/materialized_view_create.q @@ -1,3 +1,4 @@ +--! qt:dataset:src set hive.vectorized.execution.enabled=false; create table cmv_basetable_n4 (a int, b varchar(256), c decimal(10,2)); @@ -40,3 +41,16 @@ drop materialized view cmv_mat_view2_n1; drop materialized view cmv_mat_view3; drop materialized view cmv_mat_view4; drop materialized view cmv_mat_view5; + +-- ACID CMV +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.stats.autogather=false; + +create materialized view acid_cmv_part disable rewrite partitioned on (k) + stored as orc TBLPROPERTIES ('transactional'='true') + as select key k, value from src order by k limit 5; +select k, value from acid_cmv_part; + +explain formatted +select k, value from acid_cmv_part; diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create.q.out index 105203dc33..1eb5b6949b 100644 --- a/ql/src/test/results/clientpositive/llap/materialized_view_create.q.out +++ b/ql/src/test/results/clientpositive/llap/materialized_view_create.q.out @@ -292,3 +292,60 @@ POSTHOOK: query: drop materialized view cmv_mat_view5 POSTHOOK: type: DROP_MATERIALIZED_VIEW POSTHOOK: Input: default@cmv_mat_view5 POSTHOOK: Output: default@cmv_mat_view5 +PREHOOK: query: create materialized view acid_cmv_part disable rewrite partitioned on (k) + stored as orc TBLPROPERTIES ('transactional'='true') + as select key k, value from src order by k limit 5 +PREHOOK: type: CREATE_MATERIALIZED_VIEW +PREHOOK: Input: default@src +PREHOOK: Output: database:default +PREHOOK: Output: default@acid_cmv_part +PREHOOK: Output: default@acid_cmv_part +POSTHOOK: query: create materialized view acid_cmv_part disable rewrite partitioned on (k) + stored as orc TBLPROPERTIES ('transactional'='true') + as select key k, value from src order by k limit 5 +POSTHOOK: type: CREATE_MATERIALIZED_VIEW +POSTHOOK: Input: default@src +POSTHOOK: Output: database:default +POSTHOOK: Output: default@acid_cmv_part +POSTHOOK: Output: default@acid_cmv_part@k=0 +POSTHOOK: Output: default@acid_cmv_part@k=10 +POSTHOOK: Output: default@acid_cmv_part@k=100 +POSTHOOK: Lineage: acid_cmv_part PARTITION(k=0).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: acid_cmv_part PARTITION(k=100).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: acid_cmv_part PARTITION(k=10).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: select k, value from acid_cmv_part +PREHOOK: type: QUERY +PREHOOK: Input: default@acid_cmv_part +PREHOOK: Input: default@acid_cmv_part@k=0 +PREHOOK: Input: default@acid_cmv_part@k=10 +PREHOOK: Input: default@acid_cmv_part@k=100 +#### A masked pattern was here #### +POSTHOOK: query: select k, value from acid_cmv_part +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acid_cmv_part +POSTHOOK: Input: default@acid_cmv_part@k=0 +POSTHOOK: Input: default@acid_cmv_part@k=10 +POSTHOOK: Input: default@acid_cmv_part@k=100 +#### A masked pattern was here #### +0 val_0 +0 val_0 +0 val_0 +10 val_10 +100 val_100 +PREHOOK: query: explain formatted +select k, value from acid_cmv_part +PREHOOK: type: QUERY +PREHOOK: Input: default@acid_cmv_part +PREHOOK: Input: default@acid_cmv_part@k=0 +PREHOOK: Input: default@acid_cmv_part@k=10 +PREHOOK: Input: default@acid_cmv_part@k=100 +#### A masked pattern was here #### +POSTHOOK: query: explain formatted +select k, value from acid_cmv_part +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acid_cmv_part +POSTHOOK: Input: default@acid_cmv_part@k=0 +POSTHOOK: Input: default@acid_cmv_part@k=10 +POSTHOOK: Input: default@acid_cmv_part@k=100 +#### A masked pattern was here #### +{"CBOPlan":"{\n \"rels\": [\n {\n \"id\": \"0\",\n \"relOp\": \"org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan\",\n \"table\": [\n \"default\",\n \"acid_cmv_part\"\n ],\n \"table:alias\": \"acid_cmv_part\",\n \"inputs\": [],\n \"rowCount\": 202.0,\n \"avgRowSize\": 200.0,\n \"rowType\": [\n {\n \"type\": \"VARCHAR\",\n \"nullable\": true,\n \"precision\": 2147483647,\n \"name\": \"value\"\n },\n {\n \"type\": \"VARCHAR\",\n \"nullable\": true,\n \"precision\": 2147483647,\n \"name\": \"k\"\n },\n {\n \"type\": \"BIGINT\",\n \"nullable\": true,\n \"name\": \"BLOCK__OFFSET__INSIDE__FILE\"\n },\n {\n \"type\": \"VARCHAR\",\n \"nullable\": true,\n \"precision\": 2147483647,\n \"name\": \"INPUT__FILE__NAME\"\n },\n {\n \"fields\": [\n {\n \"type\": \"BIGINT\",\n \"nullable\": true,\n \"name\": \"writeid\"\n },\n {\n \"type\": \"INTEGER\",\n \"nullable\": true,\n \"name\": \"bucketid\"\n },\n {\n \"type\": \"BIGINT\",\n \"nullable\": true,\n \"name\": \"rowid\"\n }\n ],\n \"name\": \"ROW__ID\"\n }\n ],\n \"partitionColumns\": [\n \"k\"\n ],\n \"colStats\": [\n {\n \"name\": \"value\",\n \"ndv\": 40\n },\n {\n \"name\": \"k\",\n \"ndv\": 3\n }\n ]\n },\n {\n \"id\": \"1\",\n \"relOp\": \"org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject\",\n \"fields\": [\n \"k\",\n \"value\"\n ],\n \"exprs\": [\n {\n \"input\": 1,\n \"name\": \"$1\"\n },\n {\n \"input\": 0,\n \"name\": \"$0\"\n }\n ],\n \"rowCount\": 202.0\n }\n ]\n}","optimizedSQL":"SELECT `k`, `value`\nFROM `default`.`acid_cmv_part`","cboInfo":"Plan optimized by CBO.","STAGE DEPENDENCIES":{"Stage-0":{"ROOT STAGE":"TRUE"}},"STAGE PLANS":{"Stage-0":{"Fetch Operator":{"limit:":"-1","Processor Tree:":{"TableScan":{"alias:":"acid_cmv_part","columns:":["value"],"database:":"default","table:":"acid_cmv_part","isTempTable:":"false","OperatorId:":"TS_0","children":{"Select Operator":{"expressions:":"k (type: string), value (type: string)","columnExprMap:":{"_col0":"k","_col1":"value"},"outputColumnNames:":["_col0","_col1"],"OperatorId:":"SEL_1","children":{"ListSink":{"OperatorId:":"LIST_SINK_3"}}}}}}}}}}