diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index ba2b3f3311..4b687d66f3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -3019,7 +3019,22 @@ public void testSkipTables() throws IOException { run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); verifyIfTableNotExist(dbName + "_dupe", "acid_table", metaStoreClientMirror); - // // Create another table for incremental repl verification + // Test alter table + run("ALTER TABLE " + dbName + ".acid_table RENAME TO " + dbName + ".acid_table_rename", driver); + verifyIfTableExist(dbName, "acid_table_rename", metaStoreClient); + + // Perform REPL-DUMP/LOAD + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); + String incrementalDumpLocn = getResult(0,0,driver); + String incrementalDumpId = getResult(0,1,true,driver); + LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); + printOutput(driverMirror); + run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'", driverMirror); + verifyIfTableNotExist(dbName + "_dupe", "acid_table_rename", metaStoreClientMirror); + + // Create another table for incremental repl verification run("CREATE TABLE " + dbName + ".acid_table_incremental (key int, value int) PARTITIONED BY (load_date date) " + "CLUSTERED BY(key) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); verifyIfTableExist(dbName, "acid_table_incremental", metaStoreClient); @@ -3027,8 +3042,8 @@ public void testSkipTables() throws IOException { // Perform REPL-DUMP/LOAD advanceDumpDir(); run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); - String incrementalDumpLocn = getResult(0,0,driver); - String incrementalDumpId = getResult(0,1,true,driver); + incrementalDumpLocn = getResult(0,0,driver); + incrementalDumpId = getResult(0,1,true,driver); LOG.info("Incremental-dump: Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); printOutput(driverMirror); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 72368af83b..def83842ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -44,13 +44,9 @@ protected AddPartitionHandler(NotificationEvent notificationEvent) { @Override public void handle(Context withinContext) throws Exception { - AddPartitionMessage apm = deserializer.getAddPartitionMessage(event.getMessage()); LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), event.getMessage()); - Iterable ptns = apm.getPartitionObjs(); - if ((ptns == null) || (!ptns.iterator().hasNext())) { - LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions"); - return; - } + + AddPartitionMessage apm = deserializer.getAddPartitionMessage(event.getMessage()); org.apache.hadoop.hive.metastore.api.Table tobj = apm.getTableObj(); if (tobj == null) { LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed"); @@ -58,6 +54,16 @@ public void handle(Context withinContext) throws Exception { } final Table qlMdTable = new Table(tobj); + if (!EximUtil.shouldExportTable(withinContext.replicationSpec, qlMdTable)) { + return; + } + + Iterable ptns = apm.getPartitionObjs(); + if ((ptns == null) || (!ptns.iterator().hasNext())) { + LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions"); + return; + } + Iterable qlPtns = Iterables.transform( ptns, new Function() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java index a9db135dc2..58df6650fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java @@ -87,9 +87,13 @@ private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Partition bef public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), event.getMessage()); + Table qlMdTable = new Table(tableObject); + if (!EximUtil.shouldExportTable(withinContext.replicationSpec, qlMdTable)) { + return; + } + if (Scenario.ALTER == scenario) { withinContext.replicationSpec.setIsMetadataOnly(true); - Table qlMdTable = new Table(tableObject); List partitions = new ArrayList<>(); partitions.add(new Partition(qlMdTable, after)); Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index ab9a9de373..4e3ce0e8cb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; class AlterTableHandler extends AbstractEventHandler { + private final org.apache.hadoop.hive.metastore.api.Table before; private final org.apache.hadoop.hive.metastore.api.Table after; private final boolean isTruncateOp; private final Scenario scenario; @@ -58,7 +59,7 @@ DumpType dumpType() { AlterTableHandler(NotificationEvent event) throws Exception { super(event); AlterTableMessage atm = deserializer.getAlterTableMessage(event.getMessage()); - org.apache.hadoop.hive.metastore.api.Table before = atm.getTableObjBefore(); + before = atm.getTableObjBefore(); after = atm.getTableObjAfter(); isTruncateOp = atm.getIsTruncateOp(); scenario = scenarioType(before, after); @@ -76,23 +77,28 @@ private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Table before, @Override public void handle(Context withinContext) throws Exception { - { - LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), event.getMessage()); - if (Scenario.ALTER == scenario) { - withinContext.replicationSpec.setIsMetadataOnly(true); - Table qlMdTableAfter = new Table(after); - Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); - EximUtil.createExportDump( - metaDataPath.getFileSystem(withinContext.hiveConf), - metaDataPath, - qlMdTableAfter, - null, - withinContext.replicationSpec); - } - DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); - dmd.write(); + LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), event.getMessage()); + + Table qlMdTableBefore = new Table(before); + if (!EximUtil.shouldExportTable(withinContext.replicationSpec, qlMdTableBefore)) { + return; + } + + if (Scenario.ALTER == scenario) { + withinContext.replicationSpec.setIsMetadataOnly(true); + Table qlMdTableAfter = new Table(after); + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump( + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTableAfter, + null, + withinContext.replicationSpec); } + + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 956bb08050..df852a3dce 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -43,6 +43,11 @@ public void handle(Context withinContext) throws Exception { InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage()); org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(insertMsg); + + if (!EximUtil.shouldExportTable(withinContext.replicationSpec, qlMdTable)) { + return; + } + List qlPtns = null; if (qlMdTable.isPartitioned() && (null != insertMsg.getPtnObj())) { qlPtns = Collections.singletonList(partitionObject(qlMdTable, insertMsg));