diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index ab818349b5..4b5452535f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -331,7 +331,7 @@ private void loadAndVerify(String replDbName, String sourceDbNameOrPattern, Stri * appropriately. This tests bootstrap behaviour primarily. */ @Test - public void testBasic() throws IOException { + public void testBasic() throws IOException, SemanticException { String name = testName.getMethodName(); String dbName = createDB(name, driver); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); @@ -487,7 +487,7 @@ private Task getReplLoadRootTask(String sourceDb, String replicadb, boolean isIn } ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), sourceDb, replicadb, null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId), - 0L, metricCollector); + 0L, metricCollector, false); Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); replLoadTask.initialize(null, null, new TaskQueue(driver.getContext()), driver.getContext()); replLoadTask.executeTask(null); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index 9d5e8af30c..4d47254e0c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Assert; @@ -46,6 +47,7 @@ import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; +import static org.junit.Assert.assertEquals; /** * Tests Table level replication scenarios. @@ -156,6 +158,10 @@ private String replicateAndVerify(String replPolicy, String oldReplPolicy, Strin WarehouseInstance.Tuple tuple = primary.dump(replPolicy, oldReplPolicy, dumpWithClause); + DumpMetaData dumpMetaData = new DumpMetaData(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), conf); + Assert.assertEquals(oldReplPolicy != null && !replPolicy.equals(oldReplPolicy), + dumpMetaData.isReplScopeModified()); + if (bootstrappedTables != null) { verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, bootstrappedTables); } @@ -163,6 +169,8 @@ private String replicateAndVerify(String replPolicy, String oldReplPolicy, Strin // If the policy contains '.'' means its table level replication. verifyTableListForPolicy(tuple.dumpLocation, replPolicy.contains(".'") ? expectedTables : null); + verifyDumpMetadata(replPolicy, new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR)); + replica.load(replicatedDbName, replPolicy, loadWithClause) .run("use " + replicatedDbName) .run("show tables") @@ -180,6 +188,21 @@ private String replicateAndVerify(String replPolicy, String oldReplPolicy, Strin return tuple.lastReplicationId; } + private void verifyDumpMetadata(String replPolicy, Path dumpPath) throws SemanticException { + String[] parseReplPolicy = replPolicy.split("\\.'"); + assertEquals(parseReplPolicy[0], new DumpMetaData(dumpPath, conf).getReplScope().getDbName()); + if (parseReplPolicy.length > 1) { + parseReplPolicy[1] = parseReplPolicy[1].replaceAll("'", ""); + assertEquals(parseReplPolicy[1], + new DumpMetaData(dumpPath, conf).getReplScope().getIncludedTableNames()); + } + if (parseReplPolicy.length > 2) { + parseReplPolicy[2] = parseReplPolicy[2].replaceAll("'", ""); + assertEquals(parseReplPolicy[2], + new DumpMetaData(dumpPath, conf).getReplScope().getExcludedTableNames()); + } + } + private String replicateAndVerifyClearDump(String replPolicy, String oldReplPolicy, String lastReplId, List dumpWithClause, List loadWithClause, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 1fce791cbe..7e690fce35 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -54,7 +54,6 @@ import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.orc.ExternalCache; import org.apache.hadoop.hive.ql.lockmgr.DbLockManager; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; @@ -604,12 +603,11 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive replLogger.endLog(lastReplId.toString()); LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); - dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executionId); + dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executionId, + work.oldReplScope != null); // If repl policy is changed (oldReplScope is set), then pass the current replication policy, // so that REPL LOAD would drop the tables which are not included in current policy. - if (work.oldReplScope != null) { - dmd.setReplScope(work.replScope); - } + dmd.setReplScope(work.replScope); dmd.write(true); int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE); try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize); @@ -940,7 +938,9 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) LOG.info("Preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); - dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId); + dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId, + work.oldReplScope != null); + dmd.setReplScope(work.replScope); dmd.write(true); work.setFunctionCopyPathIterator(functionsBinaryCopyPaths.iterator()); setDataCopyIterators(extTableFileList, managedTblList); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index e7245bdd63..2f416731ef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -592,7 +592,9 @@ private void createBuilderTask(List> rootTasks) { private int executeIncrementalLoad() throws Exception { // If replication policy is changed between previous and current repl load, then drop the tables // that are excluded in the new replication policy. - dropTablesExcludedInReplScope(work.currentReplScope); + if (work.replScopeModified) { + dropTablesExcludedInReplScope(work.currentReplScope); + } IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder(); // If incremental events are already applied, then check and perform if need to bootstrap any tables. if (!builder.hasMoreWork() && work.isLastReplIDUpdated()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 376fd7caa1..a52dac27b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -58,6 +58,7 @@ private String sourceDbName; private Long dumpExecutionId; private final transient ReplicationMetricCollector metricCollector; + final boolean replScopeModified; private final ConstraintEventsIterator constraintsIterator; private int loadTaskRunCount = 0; @@ -78,7 +79,8 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String sourceDbName, String dbNameToLoadIn, ReplScope currentReplScope, LineageState lineageState, boolean isIncrementalDump, Long eventTo, Long dumpExecutionId, - ReplicationMetricCollector metricCollector) throws IOException, SemanticException { + ReplicationMetricCollector metricCollector, + boolean replScopeModified) throws IOException, SemanticException { sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; this.dbNameToLoadIn = dbNameToLoadIn; @@ -86,6 +88,7 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, this.sourceDbName = sourceDbName; this.dumpExecutionId = dumpExecutionId; this.metricCollector = metricCollector; + this.replScopeModified = replScopeModified; // If DB name is changed during REPL LOAD, then set it instead of referring to source DB name. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 4c10499e5e..ed408b0c1c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -372,7 +372,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { dmd.getReplScope(), queryState.getLineageState(), evDump, dmd.getEventTo(), dmd.getDumpExecutionId(), initMetricCollection(!evDump, loadPath.toString(), replScope.getDbName(), - dmd.getDumpExecutionId())); + dmd.getDumpExecutionId()), dmd.isReplScopeModified()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); } else { LOG.warn("Previous Dump Already Loaded"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java index b6d43f7946..c428ea25f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java @@ -52,6 +52,7 @@ private final Path dumpFile; private final HiveConf hiveConf; private Long dumpExecutionId; + private boolean replScopeModified = false; public DumpMetaData(Path dumpRoot, HiveConf hiveConf) { this.hiveConf = hiveConf; @@ -61,16 +62,18 @@ public DumpMetaData(Path dumpRoot, HiveConf hiveConf) { public DumpMetaData(Path dumpRoot, DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot, HiveConf hiveConf) { this(dumpRoot, hiveConf); - setDump(lvl, eventFrom, eventTo, cmRoot, 0L); + setDump(lvl, eventFrom, eventTo, cmRoot, 0L, false); } - public void setDump(DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot, Long dumpExecutionId) { + public void setDump(DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot, Long dumpExecutionId, + boolean replScopeModified) { this.dumpType = lvl; this.eventFrom = eventFrom; this.eventTo = eventTo; this.cmRoot = cmRoot; this.initialized = true; this.dumpExecutionId = dumpExecutionId; + this.replScopeModified = replScopeModified; } public void setPayload(String payload) { @@ -117,10 +120,10 @@ private void loadDumpFromFile() throws SemanticException { br = new BufferedReader(new InputStreamReader(fs.open(dumpFile))); String line; if ((line = br.readLine()) != null) { - String[] lineContents = line.split("\t", 6); + String[] lineContents = line.split("\t", 7); setDump(DumpType.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2]), - new Path(lineContents[3]), Long.valueOf(lineContents[4])); + new Path(lineContents[3]), Long.valueOf(lineContents[4]), Boolean.valueOf(lineContents[6])); setPayload(lineContents[5].equals(Utilities.nullStringOutput) ? null : lineContents[5]); } else { throw new IOException( @@ -165,6 +168,11 @@ public Long getDumpExecutionId() throws SemanticException { return dumpExecutionId; } + public boolean isReplScopeModified() throws SemanticException { + initializeIfNot(); + return replScopeModified; + } + public ReplScope getReplScope() throws SemanticException { initializeIfNot(); return replScope; @@ -219,7 +227,8 @@ public void write(boolean replace) throws SemanticException { eventTo.toString(), cmRoot.toString(), dumpExecutionId.toString(), - payload) + payload, + String.valueOf(replScopeModified)) ); if (replScope != null) { listValues.add(prepareReplScopeValues());