diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 96b074dfa3..5ba5adcf4e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -662,4 +662,40 @@ public void testMultiDBTxn() throws Throwable { replica.run("drop database " + dbName1 + " cascade"); replica.run("drop database " + dbName2 + " cascade"); } + + @Test + public void testCtasMmTable() throws Throwable { + String tableName = testName.getMethodName(); + String[] resultArray = new String[]{ "1", "3" }; + String tableProperties = "\"transactional\"=\"true\", \"transactional_properties\"=\"insert_only\""; + + // Create a non-acid table and replicate it. + WarehouseInstance.Tuple bootStrapDump = primary.run("use " + primaryDbName) + .run("create table t1_nonacid (a int, b int)") + .run("insert into t1_nonacid values (1, 2), (3, 4)") + .dump(primaryDbName, null); + + replica.loadWithoutExplain(replicatedDbName, bootStrapDump.dumpLocation) + .run("use " + replicatedDbName) + .run("select a from t1_nonacid order by a") + .verifyResults(resultArray); + + // Ctas MM tables with both partitioned and non-partitioned from non-acid table and replicate it. + WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName) + .run("create table t2_mm_nonptn tblproperties (" + tableProperties + ") as select * from t1_nonacid") + .run("select a from t2_mm_nonptn order by a") + .verifyResults(resultArray) + .run("create table t3_mm_ptn partitioned by (a) tblproperties (" + + tableProperties + ") as select * from t1_nonacid") + .run("select a from t3_mm_ptn order by a") + .verifyResults(resultArray) + .dump(primaryDbName, bootStrapDump.lastReplicationId); + + replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation) + .run("use " + replicatedDbName) + .run("select a from t2_mm_nonptn order by a") + .verifyResults(resultArray) + .run("select a from t3_mm_ptn order by a") + .verifyResults(resultArray); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableDesc.java index ee32f4c9b4..4e0dc51075 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableDesc.java @@ -897,8 +897,8 @@ public Table toTable(HiveConf conf) throws HiveException { if (colStats != null) { ColumnStatisticsDesc colStatsDesc = new ColumnStatisticsDesc(colStats.getStatsDesc()); colStatsDesc.setCatName(tbl.getCatName()); - colStatsDesc.setDbName(getTableName()); - colStatsDesc.setDbName(getDatabaseName()); + colStatsDesc.setDbName(tbl.getDbName()); + colStatsDesc.setTableName(tbl.getTableName()); tbl.getTTable().setColStats(new ColumnStatistics(colStatsDesc, colStats.getStatsObj())); // Statistics will have an associated write Id for a transactional table. We need it to // update column statistics. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 0e58fe20b4..5d269b8fb7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7483,7 +7483,17 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) if (ctx.getExplainConfig() != null) { writeId = 0L; // For explain plan, txn won't be opened and doesn't make sense to allocate write id } else { - writeId = txnMgr.getTableWriteId(tblDesc.getDatabaseName(), tblDesc.getTableName()); + String dbName = tblDesc.getDatabaseName(); + String tableName = tblDesc.getTableName(); + + // CreateTableDesc stores table name as db.table. So, need to decode it before allocating + // write id. + if ((dbName == null) || tableName.contains(".")) { + String[] names = Utilities.getDbTableName(tableName); + dbName = names[0]; + tableName = names[1]; + } + writeId = txnMgr.getTableWriteId(dbName, tableName); } } catch (LockException ex) { throw new SemanticException("Failed to allocate write Id", ex); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java index 1f206984ff..dbcbe4c8af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java @@ -110,6 +110,10 @@ public void set(String replState, String dbName, String tableName, Map