diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 76e8f6c..2e4028f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -256,8 +256,7 @@ public void testIncrementalAdds() throws IOException { printOutput(); run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'"); - run("REPL STATUS " + dbName + "_dupe"); - verifyResults(new String[] {incrementalDumpId}); + verifyRun("REPL STATUS " + dbName + "_dupe", incrementalDumpId); // VERIFY tables and partitions on destination for equivalence. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 53ea346..79a6a45 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -426,7 +426,9 @@ private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception { // we will, however, do so here, now, for dev/debug's sake. Path dataPath = new Path(evRoot, "data"); rootTasks.add(ReplCopyTask.getDumpCopyTask(replicationSpec, qlMdTable.getPath(), dataPath , conf)); - (new DumpMetaData(evRoot, DUMPTYPE.EVENT_CREATE_TABLE, evid, evid)).write(); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_CREATE_TABLE, evid, evid); + dmd.setPayload(ev.getMessage()); + dmd.write(); break; } case MessageFactory.ADD_PARTITION_EVENT : { @@ -478,7 +480,9 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition replicationSpec, qlPtn.getPartitionPath(), ptnDataPath, conf)); } - (new DumpMetaData(evRoot, DUMPTYPE.EVENT_ADD_PARTITION, evid, evid)).write(); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ADD_PARTITION, evid, evid); + dmd.setPayload(ev.getMessage()); + dmd.write(); break; } case MessageFactory.DROP_TABLE_EVENT : { @@ -890,6 +894,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // in which case we honour it. However, having this be a pattern is an error. // Ditto for tblNameOrPattern. + int alterCounter = 0; if (evstage > 0){ if ((tblNameOrPattern != null) && (!tblNameOrPattern.isEmpty())){ @@ -921,6 +926,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { Task updateReplIdTask = TaskFactory.get( new DDLWork(inputs, outputs, alterTblDesc), conf); taskChainTail.addDependentTask(updateReplIdTask); + alterCounter++; taskChainTail = updateReplIdTask; } for (String dbName : dbsUpdated.keySet()){ @@ -932,8 +938,10 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { Task updateReplIdTask = TaskFactory.get( new DDLWork(inputs, outputs, alterDbDesc), conf); taskChainTail.addDependentTask(updateReplIdTask); + alterCounter++; taskChainTail = updateReplIdTask; } + LOG.debug("Added {} alters at the end to restore repl.last.id to parent objects",alterCounter); rootTasks.add(evTaskRoot); } @@ -952,7 +960,14 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { MessageDeserializer md = MessageFactory.getInstance().getDeserializer(); switch (dmd.getDumpType()) { case EVENT_CREATE_TABLE: { - return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); + CreateTableMessage createTableMessage = md.getCreateTableMessage(dmd.getPayload()); + String actualDbName = ((dbName == null) || dbName.isEmpty() ? createTableMessage.getDB() : dbName); + String actualTblName = ((tblName == null) || tblName.isEmpty() ? createTableMessage.getTable() : tblName); + List> tasks = analyzeTableLoad( + dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); + tablesUpdated.remove(actualDbName + "." + actualTblName); + // if we've just loaded this table, then we don't need to do an alter table at the end to track it + return tasks; } case EVENT_ADD_PARTITION: { return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); @@ -972,7 +987,9 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { List> tasks = new ArrayList>(); tasks.add(dropTableTask); LOG.debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName()); - dbsUpdated.put(actualDbName,dmd.getEventTo()); + dbsUpdated.put(actualDbName, dmd.getEventTo()); + tablesUpdated.remove(actualDbName + "." + actualTblName); + // if this table just got dropped, we have no need to update it at the end. return tasks; } case EVENT_DROP_PARTITION: { @@ -1015,7 +1032,14 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } } case EVENT_ALTER_TABLE: { - return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); + AlterTableMessage alterTableMessage = md.getAlterTableMessage(dmd.getPayload()); + String actualDbName = ((dbName == null) || dbName.isEmpty() ? alterTableMessage.getDB() : dbName); + String actualTblName = ((tblName == null) || tblName.isEmpty() ? alterTableMessage.getTable() : tblName); + List> tasks = analyzeTableLoad( + dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); + tablesUpdated.remove(actualDbName + "." + actualTblName); + // if we've just loaded this table, then we don't need to do an alter table at the end to track it + return tasks; } case EVENT_RENAME_TABLE: { AlterTableMessage renameTableMessage = md.getAlterTableMessage(dmd.getPayload());