diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index c6f8e401b7..c0698311ed 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -662,35 +662,6 @@ public Boolean apply(@Nullable CallerArguments args) { } } - @Test - public void retryIncBootstrapExternalTablesFromDifferentDumpWithoutCleanTablesConfig() throws Throwable { - List dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" - ); - List loadWithClause = externalTableBasePathWithClause(); - - WarehouseInstance.Tuple tupleBootstrapWithoutExternal = primary - .dump(primaryDbName, null, dumpWithClause); - - replica.load(replicatedDbName, tupleBootstrapWithoutExternal.dumpLocation, loadWithClause); - - dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); - WarehouseInstance.Tuple tupleIncWithExternalBootstrap = primary.run("use " + primaryDbName) - .run("create external table t1 (id int)") - .run("insert into table t1 values (1)") - .run("create table t2 as select * from t1") - .dump(primaryDbName, tupleBootstrapWithoutExternal.lastReplicationId, dumpWithClause); - WarehouseInstance.Tuple tupleNewIncWithExternalBootstrap - = primary.dump(primaryDbName, tupleBootstrapWithoutExternal.lastReplicationId, dumpWithClause); - - replica.load(replicatedDbName, tupleIncWithExternalBootstrap.dumpLocation, loadWithClause); - - // Re-bootstrapping from different bootstrap dump without clean tables config should fail. - replica.loadFailure(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause, - ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode()); - } - @Test public void testExternalTableDataPath() throws Exception { HiveConf conf = primary.getConf(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java index 41f2b9da3e..88b7465daf 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.junit.After; @@ -114,6 +115,8 @@ static void internalBeforeClassSetup(Map overrideConfigs) throws put("hive.support.concurrency", "false"); put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); put("hive.strict.managed.tables", "false"); + put("hive.stats.autogather", "true"); + put("hive.stats.column.autogather", "true"); }}; configsForPrimary.putAll(overrideConfigs); primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary); @@ -504,4 +507,61 @@ public void dynamicallyConvertExternalToManagedTable() throws Throwable { .runFailure("alter table t1 set tblproperties('EXTERNAL'='false')") .runFailure("alter table t2 set tblproperties('EXTERNAL'='false')"); } + + @Test + public void testMigrationWithUpgrade() throws Throwable { + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ") + .run("insert into tacid values (3)") + .run("create table texternal (id int) ") + .run("insert into texternal values (1)") + .dump(primaryDbName, null); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select id from tacid") + .verifyResult("3") + .run("select id from texternal") + .verifyResult("1"); + + assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid"))); + assertFalse(MetaStoreUtils.isExternalTable(replica.getTable(replicatedDbName, "texternal"))); + + // forcefully (setting db property) alter the table type. For acid table, set the bootstrap acid table to true. For + // external table, the alter event should alter the table type at target cluster and then distcp should copy the + // files. This is done to mock the upgrade done using HiveStrictManagedMigration. + primary.enableAcid(); + primary.run("use " + primaryDbName) + .run("alter database " + primaryDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '')") + .run("insert into tacid values (1)") + .run("insert into texternal values (2)") + .run("alter table tacid set tblproperties ('transactional'='true')") + .run("alter table texternal SET TBLPROPERTIES('EXTERNAL'='TRUE')") + .run("insert into texternal values (3)") + .run("alter database " + primaryDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + + assertTrue(isFullAcidTable(primary.getTable(primaryDbName, "tacid"))); + assertTrue(MetaStoreUtils.isExternalTable(primary.getTable(primaryDbName, "texternal"))); + + primary.disableAcid(); + + List withConfigs = new ArrayList(); + withConfigs.add("'hive.repl.bootstrap.acid.tables'='true'"); + withConfigs.add("'hive.repl.dump.include.acid.tables'='true'"); + withConfigs.add("'hive.repl.include.external.tables'='true'"); + withConfigs.add("'hive.distcp.privileged.doAs' = '" + UserGroupInformation.getCurrentUser().getUserName() + "'"); + tuple = primary.dump(primaryDbName, tuple.lastReplicationId, withConfigs); + replica.load(replicatedDbName, tuple.dumpLocation, withConfigs); + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select id from tacid") + .verifyResults(new String[] { "1", "3" }) + .run("select id from texternal") + .verifyResults(new String[] { "1", "2", "3" }); + + assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid"))); + assertTrue(MetaStoreUtils.isExternalTable(replica.getTable(replicatedDbName, "texternal"))); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 01f8cda2ff..08df13da74 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -563,7 +563,21 @@ public void testEventCounts(String dbName, long fromEventId, Long toEventId, Int } public boolean isAcidEnabled() { - return hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY) && + hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER).equals("org.apache.hadoop.hive.ql.lockmgr.DbTxnManager")) { + return true; + } + return false; + } + + public void disableAcid() { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + } + + public void enableAcid() { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); + hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 84bdcd7025..1206677fd8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -179,7 +179,7 @@ a database ( directory ) TableEvent tableEvent = (TableEvent) next; LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(), tableContext, loadTaskTracker); - tableTracker = loadTable.tasks(); + tableTracker = loadTable.tasks(work.isBootstrapDuringIncLoad()); setUpDependencies(dbTracker, tableTracker); if (!scope.database && tableTracker.hasTasks()) { scope.rootTasks.addAll(tableTracker.tasks()); @@ -264,6 +264,7 @@ a database ( directory ) || work.getPathsToCopyIterator().hasNext(); if (addAnotherLoadTask) { + // pass on the bootstrap during incremental flag for next iteration. createBuilderTask(scope.rootTasks); } @@ -471,6 +472,7 @@ private int executeIncrementalLoad(DriverContext driverContext) { if (work.hasBootstrapLoadTasks()) { LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap " + "mode after applying all events."); + work.setBootstrapDuringIncLoad(true); return executeBootStrapLoad(driverContext); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index c5e083142a..82933c0cd8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -54,6 +54,7 @@ private transient IncrementalLoadTasksBuilder incrementalLoadTasksBuilder; private transient Task rootTask; private final transient Iterator pathsToCopyIterator; + private boolean isBootstrapDuringIncLoad = false; /* these are sessionState objects that are copied over to work to allow for parallel execution. @@ -148,4 +149,12 @@ public void setRootTask(Task rootTask) { public Iterator getPathsToCopyIterator() { return pathsToCopyIterator; } + + public boolean isBootstrapDuringIncLoad() { + return isBootstrapDuringIncLoad; + } + + public void setBootstrapDuringIncLoad(boolean bootstrapDuringIncLoad) { + isBootstrapDuringIncLoad = bootstrapDuringIncLoad; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 43a4067757..00c7be75c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -84,7 +84,7 @@ public LoadTable(TableEvent event, Context context, ReplLogger replLogger, this.tracker = new TaskTracker(limiter); } - public TaskTracker tasks() throws Exception { + public TaskTracker tasks(boolean isBootstrapDuringInc) throws Exception { // Path being passed to us is a table dump location. We go ahead and load it in as needed. // If tblName is null, then we default to the table name specified in _metadata, which is good. // or are both specified, in which case, that's what we are intended to create the new table as. @@ -114,7 +114,7 @@ public TaskTracker tasks() throws Exception { } Task tblRootTask = null; - ReplLoadOpType loadTblType = getLoadTableType(table); + ReplLoadOpType loadTblType = getLoadTableType(table, isBootstrapDuringInc); switch (loadTblType) { case LOAD_NEW: break; @@ -159,10 +159,21 @@ public TaskTracker tasks() throws Exception { return tracker; } - private ReplLoadOpType getLoadTableType(Table table) throws InvalidOperationException, HiveException { + private ReplLoadOpType getLoadTableType(Table table, boolean isBootstrapDuringInc) + throws InvalidOperationException, HiveException { if (table == null) { return ReplLoadOpType.LOAD_NEW; } + + // In case user has asked for bootstrap of table during a incremental load, we replace the old one if present. + // This is to make sure that the transactional info like write id etc for the table is consistent between the + // source and target cluster. This is also to avoid mismatch between target and source cluster table type in case + // migration and upgrade uses different conversion rule. + if (isBootstrapDuringInc) { + LOG.info("Table " + table.getTableName() + " will be replaced as bootstrap is requested during incremental load"); + return ReplLoadOpType.LOAD_REPLACE; + } + if (ReplUtils.replCkptStatus(table.getDbName(), table.getParameters(), context.dumpDirectory)) { return ReplLoadOpType.LOAD_SKIP; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index 4deb551617..f02cbb885f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; @@ -91,6 +92,14 @@ public void handle(Context withinContext) throws Exception { return; } + if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) { + if (!AcidUtils.isTransactionalTable(before) && AcidUtils.isTransactionalTable(after)) { + LOG.info("The table " + after.getTableName() + " is converted to ACID table." + + " It will be replicated with bootstrap load as hive.repl.bootstrap.acid.tables is set to true."); + return; + } + } + if (Scenario.ALTER == scenario) { withinContext.replicationSpec.setIsMetadataOnly(true); Table qlMdTableAfter = new Table(after);