diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index c46103a254..0fc8292179 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -371,8 +371,7 @@ private Task getReplLoadRootTask(String replicadb, boolean isIncrementalDump, Tu confTemp.set("hive.repl.enable.move.optimization", "true"); Path loadPath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), replicadb, - null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId), - Collections.emptyList()); + null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId)); Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); replLoadTask.initialize(null, null, new TaskQueue(driver.getContext()), driver.getContext()); replLoadTask.executeTask(null); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 81feaf5eec..42cce06ad7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -139,6 +139,7 @@ public void replicationWithoutExternalTables() throws Throwable { @Test public void externalTableReplicationWithDefaultPaths() throws Throwable { + List withClauseOptions = externalTableBasePathWithClause(); //creates external tables with partitions WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -149,12 +150,12 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .run("insert into table t2 partition(country='india') values ('bangalore')") .run("insert into table t2 partition(country='us') values ('austin')") .run("insert into table t2 partition(country='france') values ('paris')") - .dumpWithCommand("repl dump " + primaryDbName); + .dump(primaryDbName, withClauseOptions); // verify that the external table info is written correctly for bootstrap assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName); - List withClauseOptions = externalTableBasePathWithClause(); + replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -180,7 +181,7 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .run("create external table t3 (id int)") .run("insert into table t3 values (10)") .run("create external table t4 as select id from t3") - .dumpWithCommand("repl dump " + primaryDbName); + .dump(primaryDbName, withClauseOptions); // verify that the external table info is written correctly for incremental assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation); @@ -244,7 +245,7 @@ public void externalTableReplicationWithCustomPaths() throws Throwable { // Create base directory but use HDFS path without schema or authority details. // Hive should pick up the local cluster's HDFS schema/authority. externalTableBasePathWithClause(); - List loadWithClause = Arrays.asList( + List withClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.update'=''" @@ -254,9 +255,9 @@ public void externalTableReplicationWithCustomPaths() throws Throwable { .run("create external table a (i int, j int) " + "row format delimited fields terminated by ',' " + "location '" + externalTableLocation.toUri() + "'") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 'a'") .verifyResults(Collections.singletonList("a")) @@ -270,11 +271,14 @@ public void externalTableReplicationWithCustomPaths() throws Throwable { outputStream.write("1,2\n".getBytes()); outputStream.write("13,21\n".getBytes()); } - primary.run("create table b (i int)") .dump(primaryDbName); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause); + WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)") + .dump(primaryDbName, withClause); + + replica.load(replicatedDbName, incrementalTuple.dumpLocation, withClause) .run("select i From a") .verifyResults(new String[] { "1", "13" }) .run("select j from a") @@ -285,9 +289,9 @@ public void externalTableReplicationWithCustomPaths() throws Throwable { new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/new_location/a/"); primary.run("use " + primaryDbName) .run("alter table a set location '" + externalTableLocation + "'") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("select i From a") .verifyResults(Collections.emptyList()); @@ -301,18 +305,18 @@ public void externalTableWithPartitions() throws Throwable { DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); fs.mkdirs(externalTableLocation, new FsPermission("777")); - List loadWithClause = externalTableBasePathWithClause(); + List withClause = externalTableBasePathWithClause(); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create external table t2 (place string) partitioned by (country string) row format " + "delimited fields terminated by ',' location '" + externalTableLocation.toString() + "'") .run("insert into t2 partition(country='india') values ('bangalore')") - .dumpWithCommand("repl dump " + primaryDbName); + .dump(primaryDbName, withClause); assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, primaryDbName); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 't2'") .verifyResults(new String[] { "t2" }) @@ -331,11 +335,11 @@ public void externalTableWithPartitions() throws Throwable { tuple = primary.run("use " + primaryDbName) .run("insert into t2 partition(country='australia') values ('sydney')") - .dump(primaryDbName); + .dump(primaryDbName, withClause); assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("select distinct(country) from t2") .verifyResults(new String[] { "india", "australia" }) @@ -358,9 +362,9 @@ public void externalTableWithPartitions() throws Throwable { primary.run("use " + primaryDbName) .run("ALTER TABLE t2 ADD PARTITION (country='france') LOCATION '" + customPartitionLocation .toString() + "'") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("select place from t2 where country='france'") .verifyResults(new String[] { "paris" }) @@ -372,9 +376,9 @@ public void externalTableWithPartitions() throws Throwable { primary.run("use " + primaryDbName) .run("alter table t2 partition (country='france') set location '" + tmpLocation + "'") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("select place from t2 where country='france'") .verifyResults(new String[] {}) @@ -392,17 +396,19 @@ public void externalTableWithPartitions() throws Throwable { primary.run("use " + primaryDbName) .run("insert into table t2 partition(country='france') values ('lyon')") .run("alter table t2 set location '" + tmpLocation2 + "'") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause); + replica.load(replicatedDbName, primaryDbName, withClause); assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2"); } @Test public void externalTableIncrementalReplication() throws Throwable { - WarehouseInstance.Tuple tuple = primary.dumpWithCommand("repl dump " + primaryDbName); + List withClause = externalTableBasePathWithClause(); + String replDumpCommand = "repl dump " + primaryDbName + + " WITH (" + withClause.get(0) + "," + withClause.get(1) + ")"; + WarehouseInstance.Tuple tuple = primary.dumpWithCommand(replDumpCommand); replica.load(replicatedDbName, primaryDbName); - Path externalTableLocation = new Path("/" + testName.getMethodName() + "/t1/"); DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); @@ -414,12 +420,12 @@ public void externalTableIncrementalReplication() throws Throwable { + "'") .run("alter table t1 add partition(country='india')") .run("alter table t1 add partition(country='us')") - .dump(primaryDbName); + .dump(primaryDbName, withClause); assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation); // Add new data externally, to a partition, but under the partition level top directory - // Also, it is added after dumping the events but data should be seen at target after REPL LOAD. + // Also, it is added after dumping the events so data should not be seen at target after REPL LOAD. Path partitionDir = new Path(externalTableLocation, "country=india"); try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) { outputStream.write("pune\n".getBytes()); @@ -431,16 +437,29 @@ public void externalTableIncrementalReplication() throws Throwable { } List loadWithClause = externalTableBasePathWithClause(); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") .run("show partitions t1") .verifyResults(new String[] { "country=india", "country=us" }) .run("select place from t1 order by place") - .verifyResults(new String[] { "bangalore", "mumbai", "pune" }) + .verifyResults(new String[] {}) .verifyReplTargetProperty(replicatedDbName); + // The Data should be seen after next dump-and-load cycle. + tuple = primary.run("use " + primaryDbName) + .dump(primaryDbName, withClause); + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show partitions t1") + .verifyResults(new String[] { "country=india", "country=us" }) + .run("select place from t1 order by place") + .verifyResults(new String[] { "bangalore", "mumbai", "pune" }) + .verifyReplTargetProperty(replicatedDbName); + // Delete one of the file and update another one. fs.delete(new Path(partitionDir, "file.txt"), true); fs.delete(new Path(partitionDir, "file1.txt"), true); @@ -449,10 +468,10 @@ public void externalTableIncrementalReplication() throws Throwable { } // Repl load with zero events but external tables location info should present. - tuple = primary.dump(primaryDbName); + tuple = primary.dump(primaryDbName, withClause); assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -471,7 +490,7 @@ public void externalTableIncrementalReplication() throws Throwable { tuple = primary .run("alter table t1 drop partition (country='india')") .run("alter table t1 drop partition (country='us')") - .dump(primaryDbName); + .dump(primaryDbName, withClause); replica.load(replicatedDbName, primaryDbName) .run("select * From t1") @@ -516,7 +535,10 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { .verifyReplTargetProperty(replicatedDbName); dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + + REPLICA_EXTERNAL_BASE + "'"); + tuple = primary.run("use " + primaryDbName) .run("drop table t1") .run("create external table t3 (id int)") @@ -610,7 +632,8 @@ public void retryBootstrapExternalTablesFromDifferentDump() throws Throwable { .verifyReplTargetProperty(replicatedDbName); dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'"); WarehouseInstance.Tuple tupleIncWithExternalBootstrap = primary.run("use " + primaryDbName) .run("drop table t1") .run("create external table t4 (id int)") @@ -708,9 +731,11 @@ public void testExternalTableDataPath() throws Exception { @Test public void testExternalTablesIncReplicationWithConcurrentDropTable() throws Throwable { - List dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" - ); + List dumpWithClause = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + + REPLICA_EXTERNAL_BASE + "'"); + List loadWithClause = externalTableBasePathWithClause(); WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) .run("create external table t1 (id int)") @@ -827,6 +852,9 @@ public void testExtTableBootstrapDuringIncrementalWithoutAnyEvents() throws Thro .verifyResult("t2") .verifyReplTargetProperty(replicatedDbName); + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'"); // This looks like an empty dump but it has the ALTER TABLE event created by the previous // dump. We need it here so that the next dump won't have any events. WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName); @@ -835,8 +863,6 @@ public void testExtTableBootstrapDuringIncrementalWithoutAnyEvents() throws Thro .verifyResult(incTuple.lastReplicationId); // Take a dump with external tables bootstrapped and load it - dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); WarehouseInstance.Tuple inc2Tuple = primary.run("use " + primaryDbName) .dump(primaryDbName, dumpWithClause); @@ -853,7 +879,7 @@ public void testExtTableBootstrapDuringIncrementalWithoutAnyEvents() throws Thro @Test public void replicationWithTableNameContainsKeywords() throws Throwable { - List loadWithClause = externalTableBasePathWithClause(); + List withClause = externalTableBasePathWithClause(); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -864,9 +890,9 @@ public void replicationWithTableNameContainsKeywords() throws Throwable { .run("insert into table t2_constraints partition(country='india') values ('bangalore')") .run("insert into table t2_constraints partition(country='us') values ('austin')") .run("insert into table t2_constraints partition(country='france') values ('paris')") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("repl status " + replicatedDbName) .verifyResult(tuple.lastReplicationId) .run("use " + replicatedDbName) @@ -885,9 +911,9 @@ public void replicationWithTableNameContainsKeywords() throws Throwable { .run("create table t4_tables (id int)") .run("insert into table t4_tables values (10)") .run("insert into table t4_tables values (20)") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 't3_bootstrap'") .verifyResults(new String[] {"t3_bootstrap"}) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java index 5c5543cfd9..6a72ae89f7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java @@ -47,19 +47,18 @@ public class ExternalTableCopyTaskBuilder { private static final Logger LOG = LoggerFactory.getLogger(ExternalTableCopyTaskBuilder.class); - private final ReplLoadWork work; + private final ReplDumpWork work; private final HiveConf conf; - ExternalTableCopyTaskBuilder(ReplLoadWork work, HiveConf conf) { + ExternalTableCopyTaskBuilder(ReplDumpWork work, HiveConf conf) { this.work = work; this.conf = conf; } List> tasks(TaskTracker tracker) { List> tasks = new ArrayList<>(); - Iterator itr = work.getPathsToCopyIterator(); - while (tracker.canAddMoreTasks() && itr.hasNext()) { - DirCopyWork dirCopyWork = itr.next(); + while (tracker.canAddMoreTasks() && work.getDirCopyIterator().hasNext()) { + DirCopyWork dirCopyWork = work.getDirCopyIterator().next(); Task task = TaskFactory.get(dirCopyWork, conf); tasks.add(task); tracker.addTask(task); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 92e45b4c57..b6102d9637 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -42,7 +42,11 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; +import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -84,8 +88,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.Base64; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.UUID; +import java.util.ArrayList; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; @@ -122,25 +127,30 @@ public String getName() { @Override public int execute() { try { - Hive hiveDb = getHive(); - Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), - Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() - .getBytes(StandardCharsets.UTF_8.name()))); - Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); - Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); - DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); - // Initialize ReplChangeManager instance since we will require it to encode file URI. - ReplChangeManager.getInstance(conf); - Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); - Long lastReplId; - if (!dumpRoot.getFileSystem(conf).exists(dumpRoot) - || dumpRoot.getFileSystem(conf).listStatus(dumpRoot).length == 0) { - lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + //First Check if external table copy work has been initialized, if so, just do that and return. + if (work.dirCopyIteratorInitialized()) { + initiateExternalTableCopyTask(); } else { - work.setEventFrom(getEventFromPreviousDumpMetadata(dumpRoot)); - lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + Hive hiveDb = getHive(); + Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), + Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() + .getBytes(StandardCharsets.UTF_8.name()))); + Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); + Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); + DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); + // Initialize ReplChangeManager instance since we will require it to encode file URI. + ReplChangeManager.getInstance(conf); + Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); + Long lastReplId; + if (!dumpRoot.getFileSystem(conf).exists(dumpRoot) + || dumpRoot.getFileSystem(conf).listStatus(dumpRoot).length == 0) { + lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + } else { + work.setEventFrom(getEventFromPreviousDumpMetadata(dumpRoot)); + lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + } + prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); } - prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); } catch (Exception e) { LOG.error("failed", e); setException(e); @@ -358,6 +368,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive } Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true); + List extTableLocations = new LinkedList<>(); try (Writer writer = new Writer(dumpRoot, conf)) { for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { @@ -367,7 +378,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive // Dump external table locations if required. if (TableType.EXTERNAL_TABLE.equals(table.getTableType()) && shouldDumpExternalTableLocation()) { - writer.dataLocationDump(table); + extTableLocations.addAll(writer.dataLocationDump(table)); } // Dump the table to be bootstrapped if required. @@ -386,11 +397,23 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive } } dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); + List extTableCopyWorks = dirLocationsToCopy(extTableLocations); + work.setDirCopyIterator(extTableCopyWorks.iterator()); + initiateExternalTableCopyTask(); } return lastReplId; } + private void initiateExternalTableCopyTask() { + List> childTasks = new ArrayList<>(); + int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); + TaskTracker dirCopyTracker = new TaskTracker(maxTasks); + childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(dirCopyTracker)); + DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf))); + this.childTasks = childTasks; + } + private boolean needBootstrapAcidTablesDuringIncrementalDump() { // If acid table dump is not enabled, then no neeed to check further. if (!ReplUtils.includeAcidTableInDump(conf)) { @@ -484,6 +507,20 @@ private void dumpTableListToDumpLocation(List tableList, Path dbRoot, St LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList); } + private List dirLocationsToCopy(List sourceLocations) + throws HiveException { + List list = new ArrayList<>(sourceLocations.size()); + String baseDir = conf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname); + // this is done to remove any scheme related information that will be present in the base path + // specifically when we are replicating to cloud storage + Path basePath = new Path(baseDir); + for (Path sourcePath : sourceLocations) { + Path targetPath = ReplExternalTables.externalTableDataPath(conf, basePath, sourcePath); + list.add(new ExternalTableCopyTaskBuilder.DirCopyWork(sourcePath, targetPath)); + } + return list; + } + Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception { // bootstrap case @@ -500,6 +537,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) long waitUntilTime = System.currentTimeMillis() + timeoutInMs; String validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); + List extTableCopyWorks = Collections.emptyList(); for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("Dumping db: " + dbName); @@ -522,6 +560,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); Exception caught = null; try (Writer writer = new Writer(dbRoot, conf)) { + List extTableLocations = new LinkedList<>(); for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri()); Table table = null; @@ -532,7 +571,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) if (shouldDumpExternalTableLocation() && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) { LOG.debug("Adding table {} to external tables list", tblName); - writer.dataLocationDump(tableTuple.object); + extTableLocations.addAll(writer.dataLocationDump(tableTuple.object)); } dumpTable(dbName, tblName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, hiveDb, tableTuple); @@ -547,6 +586,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) } } dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); + extTableCopyWorks = dirLocationsToCopy(extTableLocations); } catch (Exception e) { caught = e; } finally { @@ -572,9 +612,9 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); dmd.write(); + work.setDirCopyIterator(extTableCopyWorks.iterator()); + initiateExternalTableCopyTask(); - // Set the correct last repl id to return to the user - // Currently returned bootDumpBeginReplId as we don't consolidate the events after bootstrap return bootDumpBeginReplId; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 9b11bae6ba..d72db558d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.Iterator; @Explain(displayName = "Replication Dump Operator", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, @@ -36,6 +37,7 @@ Long eventFrom; static String testInjectDumpDir = null; private Integer maxEventLimit; + private transient Iterator dirCopyIterator; public static void injectNextDumpDirForTest(String dumpDir) { testInjectDumpDir = dumpDir; @@ -87,4 +89,19 @@ void overrideLastEventToDump(Hive fromDb, long bootstrapLastId) throws Exception .debug("eventTo not specified, using current event id : {}", eventTo); } } + + public Iterator getDirCopyIterator() { + return dirCopyIterator; + } + + public void setDirCopyIterator(Iterator dirCopyIterator) { + if (dirCopyIteratorInitialized()) { + throw new IllegalStateException("Dir Copy iterator has already been initialized"); + } + this.dirCopyIterator = dirCopyIterator; + } + + public boolean dirCopyIteratorInitialized() { + return dirCopyIterator != null; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index c7aa0077a6..fddee28790 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -43,6 +43,7 @@ import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -120,11 +121,12 @@ private boolean shouldWrite() { /** * this will dump a single line per external table. it can include additional lines for the same * table if the table is partitioned and the partition location is outside the table. + * It returns list of all the external table locations. */ - void dataLocationDump(Table table) - throws InterruptedException, IOException, HiveException { + List dataLocationDump(Table table) throws InterruptedException, IOException, HiveException { + List extTableLocations = new LinkedList<>(); if (!shouldWrite()) { - return; + return extTableLocations; } if (!TableType.EXTERNAL_TABLE.equals(table.getTableType())) { throw new IllegalArgumentException( @@ -134,6 +136,7 @@ void dataLocationDump(Table table) Path fullyQualifiedDataLocation = PathBuilder.fullyQualifiedHDFSUri(table.getDataLocation(), FileSystem.get(hiveConf)); write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf)); + extTableLocations.add(fullyQualifiedDataLocation); if (table.isPartitioned()) { List partitions; try { @@ -142,7 +145,7 @@ void dataLocationDump(Table table) if (e.getCause() instanceof NoSuchObjectException) { // If table is dropped when dump in progress, just skip partitions data location dump LOG.debug(e.getMessage()); - return; + return extTableLocations; } throw e; } @@ -155,9 +158,11 @@ void dataLocationDump(Table table) fullyQualifiedDataLocation = PathBuilder .fullyQualifiedHDFSUri(partition.getDataLocation(), FileSystem.get(hiveConf)); write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf)); + extTableLocations.add(fullyQualifiedDataLocation); } } } + return extTableLocations; } private static String lineFor(String tableName, Path dataLoc, HiveConf hiveConf) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index a2c467bafd..158b18e302 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -135,16 +135,8 @@ a database ( directory ) if (!iterator.hasNext() && constraintIterator.hasNext()) { loadingConstraint = true; } - while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext()) || - (work.getPathsToCopyIterator().hasNext())) && loadTaskTracker.canAddMoreTasks()) { - // First start the distcp tasks to copy the files related to external table. The distcp tasks should be - // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these - // directory with proper permission and owner. - if (work.getPathsToCopyIterator().hasNext()) { - scope.rootTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(loadTaskTracker)); - break; - } - + while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) + && loadTaskTracker.canAddMoreTasks()) { BootstrapEvent next; if (!loadingConstraint) { next = iterator.next(); @@ -261,8 +253,7 @@ a database ( directory ) boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState() - || constraintIterator.hasNext() - || work.getPathsToCopyIterator().hasNext(); + || constraintIterator.hasNext(); if (addAnotherLoadTask) { createBuilderTask(scope.rootTasks); @@ -271,8 +262,7 @@ a database ( directory ) // Update last repl ID of the database only if the current dump is not incremental. If bootstrap // is combined with incremental dump, it contains only tables to bootstrap. So, needn't change // last repl ID of the database. - if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.getPathsToCopyIterator().hasNext() - && !work.isIncrementalLoad()) { + if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.isIncrementalLoad()) { loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, loadContext, scope)); work.updateDbEventState(null); } @@ -498,7 +488,7 @@ private int executeIncrementalLoad() { IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder(); // If incremental events are already applied, then check and perform if need to bootstrap any tables. - if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) { + if (!builder.hasMoreWork()) { if (work.hasBootstrapLoadTasks()) { LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap " + "mode after applying all events."); @@ -509,20 +499,13 @@ private int executeIncrementalLoad() { List> childTasks = new ArrayList<>(); int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); - // First start the distcp tasks to copy the files related to external table. The distcp tasks should be - // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these - // directory with proper permission and owner. TaskTracker tracker = new TaskTracker(maxTasks); - if (work.getPathsToCopyIterator().hasNext()) { - childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(tracker)); - } else { - childTasks.add(builder.build(context, getHive(), LOG, tracker)); - } + childTasks.add(builder.build(context, getHive(), LOG, tracker)); // If there are no more events to be applied, add a task to update the last.repl.id of the // target database to the event id of the last event considered by the dump. Next // incremental cycle won't consider the events in this dump again if it starts from this id. - if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) { + if (!builder.hasMoreWork()) { // The name of the database to be loaded into is either specified directly in REPL LOAD // command i.e. when dbNameToLoadIn has a valid dbname or is available through dump // metadata during table level replication. @@ -555,10 +538,8 @@ private int executeIncrementalLoad() { } } - // Either the incremental has more work or the external table file copy has more paths to process. - // Once all the incremental events are applied and external tables file copies are done, enable - // bootstrap of tables if exist. - if (builder.hasMoreWork() || work.getPathsToCopyIterator().hasNext() || work.hasBootstrapLoadTasks()) { + // Once all the incremental events are applied, enable bootstrap of tables if exist. + if (builder.hasMoreWork() || work.hasBootstrapLoadTasks()) { DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf))); } this.childTasks = childTasks; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index f25c71403d..75005b9216 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -55,7 +55,6 @@ private final transient BootstrapEventsIterator bootstrapIterator; private transient IncrementalLoadTasksBuilder incrementalLoadTasksBuilder; private transient Task rootTask; - private final transient Iterator pathsToCopyIterator; /* these are sessionState objects that are copied over to work to allow for parallel execution. @@ -66,8 +65,7 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, ReplScope currentReplScope, - LineageState lineageState, boolean isIncrementalDump, Long eventTo, - List pathsToCopyIterator) throws IOException { + LineageState lineageState, boolean isIncrementalDump, Long eventTo) throws IOException { sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; this.dbNameToLoadIn = dbNameToLoadIn; @@ -106,7 +104,6 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); incrementalLoadTasksBuilder = null; } - this.pathsToCopyIterator = pathsToCopyIterator.iterator(); } BootstrapEventsIterator bootstrapIterator() { @@ -153,8 +150,4 @@ IncrementalLoadTasksBuilder incrementalLoadTasksBuilder() { public void setRootTask(Task rootTask) { this.rootTask = rootTask; } - - public Iterator getPathsToCopyIterator() { - return pathsToCopyIterator; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index c2e9f883ce..20e665fc48 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -53,6 +53,7 @@ import java.util.Comparator; import java.util.List; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; @@ -403,8 +404,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(), dmd.getReplScope(), - queryState.getLineageState(), evDump, dmd.getEventTo(), - dirLocationsToCopy(loadPath, evDump)); + queryState.getLineageState(), evDump, dmd.getEventTo()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); } } catch (Exception e) {