diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 689ca76..70e1aa7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -620,4 +620,59 @@ public void testReplLoadFromSourceUsingWithClause() throws Throwable { .run("show functions like '" + replicatedDbName + "*'") .verifyResult(null); } + + @Test + public void testIncrementalReplWithEventsBatchHavingDropCreateTable() throws Throwable { + // Bootstrap dump with empty db + WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName, null); + + // Bootstrap load in replica + replica.load(replicatedDbName, bootstrapTuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(bootstrapTuple.lastReplicationId); + + // First incremental dump + WarehouseInstance.Tuple firstIncremental = primary.run("use " + primaryDbName) + .run("create table table1 (i int)") + .run("create table table2 (id int) partitioned by (country string)") + .run("insert into table1 values (1)") + .run("insert into table2 partition(country='india') values(1)") + .dump(primaryDbName, bootstrapTuple.lastReplicationId); + + // Second incremental dump + WarehouseInstance.Tuple secondIncremental = primary.run("use " + primaryDbName) + .run("drop table table1") + .run("drop table table2") + .run("create table table2 (id int) partitioned by (country string)") + .run("alter table table2 add partition(country='india')") + .run("alter table table2 drop partition(country='india')") + .run("insert into table2 partition(country='us') values(2)") + .run("create table table1 (i int)") + .run("insert into table1 values (2)") + .dump(primaryDbName, firstIncremental.lastReplicationId); + + // First incremental load + replica.load(replicatedDbName, firstIncremental.dumpLocation) + .status(replicatedDbName) + .verifyResult(firstIncremental.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"table1", "table2"}) + .run("select * from table1") + .verifyResults(new String[] {"1"}) + .run("select id from table2 order by id") + .verifyResults(new String[] {"1"}); + + // Second incremental load + replica.load(replicatedDbName, secondIncremental.dumpLocation) + .status(replicatedDbName) + .verifyResult(secondIncremental.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"table1", "table2"}) + .run("select * from table1") + .verifyResults(new String[] {"2"}) + .run("select id from table2 order by id") + .verifyResults(new String[] {"2"}); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 7c8020d..accdc1f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -231,6 +231,11 @@ WarehouseInstance load(String replicatedDbName, String dumpLocation, List withClauseOptions) throws Throwable { String replStatusCmd = "REPL STATUS " + replicatedDbName; if (!withClauseOptions.isEmpty()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index bda2af3..61a0432 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4522,6 +4522,12 @@ private void dropPartitions(Hive db, Table tbl, DropTableDesc dropTbl) throws Hi // for dropping. Thus, we need a way to push this filter (replicationSpec.allowEventReplacementInto) // to the metastore to allow it to do drop a partition or not, depending on a Predicate on the // parameter key values. + + if (tbl == null) { + // If table is missing, then partitions are also would've been dropped. Just no-op. + return; + } + for (DropTableDesc.PartSpec partSpec : dropTbl.getPartSpecs()){ List partitions = new ArrayList<>(); try { @@ -4551,7 +4557,7 @@ private void dropPartitions(Hive db, Table tbl, DropTableDesc dropTbl) throws Hi console.printInfo("Dropped the partition " + partition.getName()); // We have already locked the table, don't lock the partitions. addIfAbsentByName(new WriteEntity(partition, WriteEntity.WriteType.DDL_NO_LOCK)); - }; + } } private void dropTable(Hive db, Table tbl, DropTableDesc dropTbl) throws HiveException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 89837be..0d2fafb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.dump.io.DBSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; @@ -92,6 +93,7 @@ private List> tasks; private Logger LOG; private Context ctx; + private DumpType eventType = DumpType.EVENT_UNKNOWN; public HiveConf getConf() { return conf; @@ -121,6 +123,14 @@ public Context getCtx() { return ctx; } + public void setEventType(DumpType eventType) { + this.eventType = eventType; + } + + public DumpType getEventType() { + return eventType; + } + public SemanticAnalyzerWrapperContext(HiveConf conf, Hive db, HashSet inputs, HashSet outputs, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 8b639f7..832f660 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -47,12 +47,14 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.CopyWork; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; import org.apache.hadoop.hive.ql.plan.MoveWork; @@ -438,6 +440,13 @@ private static boolean isAcid(Long writeId) { return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), x.getConf()); } + private static Task dropTableTask(Table table, EximUtil.SemanticAnalyzerWrapperContext x, + ReplicationSpec replicationSpec) { + DropTableDesc dropTblDesc = new DropTableDesc(table.getTableName(), table.getTableType(), + true, false, replicationSpec); + return TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), dropTblDesc), x.getConf()); + } + private static Task alterTableTask(ImportTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x, ReplicationSpec replicationSpec) { tableDesc.setReplaceMode(true); @@ -912,7 +921,7 @@ private static void createReplImportTasks( UpdatedMetaDataTracker updatedMetadata) throws HiveException, URISyntaxException, IOException, MetaException { - Task dr = null; + Task dropTblTask = null; WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK; // Normally, on import, trying to create a table or a partition in a db that does not yet exist @@ -934,6 +943,15 @@ private static void createReplImportTasks( tblDesc.getDatabaseName(), tblDesc.getTableName()); return; } + + // If the table exists and we found a valid create table event, then need to drop the table first + // and then create it. This case is possible if the event sequence is drop_table(t1) -> create_table(t1). + // We need to drop here to handle the case where the previous incremental load created the table but + // didn't set the last repl ID due to some failure. + if (x.getEventType() == DumpType.EVENT_CREATE_TABLE) { + dropTblTask = dropTableTask(table, x, replicationSpec); + table = null; + } } else { // If table doesn't exist, allow creating a new one only if the database state is older than the update. if ((parentDb != null) && (!replicationSpec.allowReplacementInto(parentDb.getParameters()))) { @@ -1000,8 +1018,15 @@ private static void createReplImportTasks( t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId, isSourceMm)); } } - // Simply create - x.getTasks().add(t); + + if (dropTblTask != null) { + // Drop first and then create + dropTblTask.addDependentTask(t); + x.getTasks().add(dropTblTask); + } else { + // Simply create + x.getTasks().add(t); + } } else { // Table existed, and is okay to replicate into, not dropping and re-creating. if (table.isPartitioned()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java index 4cd75d8..7f6e80a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java @@ -36,6 +36,7 @@ new EximUtil.SemanticAnalyzerWrapperContext( context.hiveConf, context.db, readEntitySet, writeEntitySet, importTasks, context.log, context.nestedContext); + x.setEventType(context.dmd.getDumpType()); // REPL LOAD is not partition level. It is always DB or table level. So, passing null for partition specs. // Also, REPL LOAD doesn't support external table and hence no location set as well.