diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java index 41f2b9da3e..3d4b75c86a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.junit.After; @@ -114,6 +115,8 @@ static void internalBeforeClassSetup(Map overrideConfigs) throws put("hive.support.concurrency", "false"); put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); put("hive.strict.managed.tables", "false"); + put("hive.stats.autogather", "true"); + put("hive.stats.column.autogather", "true"); }}; configsForPrimary.putAll(overrideConfigs); primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary); @@ -504,4 +507,56 @@ public void dynamicallyConvertExternalToManagedTable() throws Throwable { .runFailure("alter table t1 set tblproperties('EXTERNAL'='false')") .runFailure("alter table t2 set tblproperties('EXTERNAL'='false')"); } + + @Test + public void testMigrationWithUpgrade() throws Throwable { + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ") + .run("create table texternal (id int) ") + .run("insert into texternal values (1)") + .dump(primaryDbName, null); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select count(*) from tacid") + .verifyResult("0") + .run("select id from texternal") + .verifyResult("1"); + + assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid"))); + assertFalse(MetaStoreUtils.isExternalTable(replica.getTable(replicatedDbName, "texternal"))); + + // forcefully (setting db property) alter the table type. For acid table, set the bootstrap acid table to true. For + // external table, the alter event should alter the table type at target cluster and then distcp should copy the + // files. + primary.enableAcid(); + primary.run("use " + primaryDbName) + .run("alter database " + primaryDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '')") + .run("insert into tacid values (1)") + .run("insert into texternal values (2)") + .run("alter table tacid set tblproperties ('transactional'='true')") + .run("alter table texternal SET TBLPROPERTIES('EXTERNAL'='TRUE')") + .run("insert into texternal values (3)") + .run("alter database " + primaryDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + primary.disableAcid(); + + List withConfigs = new ArrayList(); + withConfigs.add("'hive.repl.bootstrap.acid.tables'='true'"); + withConfigs.add("'hive.repl.dump.include.acid.tables'='true'"); + withConfigs.add("'hive.repl.include.external.tables'='true'"); + withConfigs.add("'hive.distcp.privileged.doAs' = '" + UserGroupInformation.getCurrentUser().getUserName() + "'"); + tuple = primary.dump(primaryDbName, tuple.lastReplicationId, withConfigs); + replica.load(replicatedDbName, tuple.dumpLocation, withConfigs); + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select id from tacid") + .verifyResult("1") + .run("select id from texternal") + .verifyResults(new String[] { "1", "2", "3" }); + + assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid"))); + assertTrue(MetaStoreUtils.isExternalTable(replica.getTable(replicatedDbName, "texternal"))); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 01f8cda2ff..08df13da74 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -563,7 +563,21 @@ public void testEventCounts(String dbName, long fromEventId, Long toEventId, Int } public boolean isAcidEnabled() { - return hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY) && + hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER).equals("org.apache.hadoop.hive.ql.lockmgr.DbTxnManager")) { + return true; + } + return false; + } + + public void disableAcid() { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + } + + public void enableAcid() { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); + hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 84bdcd7025..c74f1069db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -101,11 +101,11 @@ protected int execute(DriverContext driverContext) { if (work.isIncrementalLoad()) { return executeIncrementalLoad(driverContext); } else { - return executeBootStrapLoad(driverContext); + return executeBootStrapLoad(driverContext, work.isBootstrapDuringIncLoad()); } } - private int executeBootStrapLoad(DriverContext driverContext) { + private int executeBootStrapLoad(DriverContext driverContext, boolean isBootstrapDuringInc) { try { int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); Context context = new Context(work.dumpDirectory, conf, getHive(), @@ -179,7 +179,7 @@ a database ( directory ) TableEvent tableEvent = (TableEvent) next; LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(), tableContext, loadTaskTracker); - tableTracker = loadTable.tasks(); + tableTracker = loadTable.tasks(isBootstrapDuringInc); setUpDependencies(dbTracker, tableTracker); if (!scope.database && tableTracker.hasTasks()) { scope.rootTasks.addAll(tableTracker.tasks()); @@ -264,6 +264,8 @@ a database ( directory ) || work.getPathsToCopyIterator().hasNext(); if (addAnotherLoadTask) { + // pass on the bootstrap during incremental flag for next iteration. + work.setBootstrapDuringIncLoad(isBootstrapDuringInc); createBuilderTask(scope.rootTasks); } @@ -471,7 +473,7 @@ private int executeIncrementalLoad(DriverContext driverContext) { if (work.hasBootstrapLoadTasks()) { LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap " + "mode after applying all events."); - return executeBootStrapLoad(driverContext); + return executeBootStrapLoad(driverContext, true); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index c5e083142a..82933c0cd8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -54,6 +54,7 @@ private transient IncrementalLoadTasksBuilder incrementalLoadTasksBuilder; private transient Task rootTask; private final transient Iterator pathsToCopyIterator; + private boolean isBootstrapDuringIncLoad = false; /* these are sessionState objects that are copied over to work to allow for parallel execution. @@ -148,4 +149,12 @@ public void setRootTask(Task rootTask) { public Iterator getPathsToCopyIterator() { return pathsToCopyIterator; } + + public boolean isBootstrapDuringIncLoad() { + return isBootstrapDuringIncLoad; + } + + public void setBootstrapDuringIncLoad(boolean bootstrapDuringIncLoad) { + isBootstrapDuringIncLoad = bootstrapDuringIncLoad; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 43a4067757..db905fa4c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -84,7 +84,7 @@ public LoadTable(TableEvent event, Context context, ReplLogger replLogger, this.tracker = new TaskTracker(limiter); } - public TaskTracker tasks() throws Exception { + public TaskTracker tasks(boolean isBootstrapDuringInc) throws Exception { // Path being passed to us is a table dump location. We go ahead and load it in as needed. // If tblName is null, then we default to the table name specified in _metadata, which is good. // or are both specified, in which case, that's what we are intended to create the new table as. @@ -114,7 +114,7 @@ public TaskTracker tasks() throws Exception { } Task tblRootTask = null; - ReplLoadOpType loadTblType = getLoadTableType(table); + ReplLoadOpType loadTblType = getLoadTableType(table, isBootstrapDuringInc); switch (loadTblType) { case LOAD_NEW: break; @@ -159,10 +159,20 @@ public TaskTracker tasks() throws Exception { return tracker; } - private ReplLoadOpType getLoadTableType(Table table) throws InvalidOperationException, HiveException { + private ReplLoadOpType getLoadTableType(Table table, boolean isBootstrapDuringInc) + throws InvalidOperationException, HiveException { if (table == null) { return ReplLoadOpType.LOAD_NEW; } + + // In case user has asked for bootstrap of transactional table, we replace the old one if present. This is to + // make sure that the transactional info like write id etc for the table is consistent between the + // source and target cluster. + if (isBootstrapDuringInc && AcidUtils.isTransactionalTable(table)) { + LOG.info("Transactional table " + table.getTableName() + " will be replaced"); + return ReplLoadOpType.LOAD_REPLACE; + } + if (ReplUtils.replCkptStatus(table.getDbName(), table.getParameters(), context.dumpDirectory)) { return ReplLoadOpType.LOAD_SKIP; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index 4deb551617..e91aff3c3e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; @@ -91,6 +92,14 @@ public void handle(Context withinContext) throws Exception { return; } + if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) { + if (!AcidUtils.isTransactionalTable(before) && AcidUtils.isTransactionalTable(after)) { + LOG.info("The table " + after.getTableName() + " is converted to ACID table." + + " It will be replicated with bootstrap load as REPL_BOOTSTRAP_ACID_TABLES is set to true."); + return; + } + } + if (Scenario.ALTER == scenario) { withinContext.replicationSpec.setIsMetadataOnly(true); Table qlMdTableAfter = new Table(after);